-
Notifications
You must be signed in to change notification settings - Fork 1
/
hook.go
120 lines (101 loc) · 2.65 KB
/
hook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package graylog
import (
"bytes"
"errors"
"sync"
"time"
"github.com/Graylog2/go-gelf/gelf"
"github.com/Sirupsen/logrus"
)
var (
sleep = time.Sleep
now = time.Now
)
// Hook implements logrus.Hook for sending log to graylog
type Hook struct {
config Config // only assign once, wont change
nodes nodes
nodesMutex sync.RWMutex
}
// New returns a Hook with config
func New(config Config) *Hook {
return &Hook{config: config}
}
// SetNodeConfigs set graylog nodes with node configs given
func (h *Hook) SetNodeConfigs(configs ...NodeConfig) error {
ns := nodes{}
for _, config := range configs {
node, err := newNode(config)
if err != nil {
return err
}
ns = append(ns, node)
}
// mutex protect nodes from concurrent read-write access
h.nodesMutex.Lock()
h.nodes = ns
h.nodesMutex.Unlock()
return nil
}
// Levels implements logrus.Hook interface
func (h *Hook) Levels() []logrus.Level {
return logrus.AllLevels
}
// Fire implements logrus.Hook interface
func (h *Hook) Fire(entry *logrus.Entry) error {
// make gelf message
m := h.makeGELFMessage(entry)
// select a node to send message
h.nodesMutex.RLock()
node := h.nodes.selectNode()
h.nodesMutex.RUnlock()
if node == nil {
return errors.New("fail to select a graylog node for sending log message")
}
return node.logWriter.WriteMessage(m)
}
// StartHealthCheck checks if graylog nodes are alive periodically
func (h *Hook) StartHealthCheck() {
go h.loopCheckNodesHealth(checkNodeStatus)
}
func (h *Hook) makeGELFMessage(entry *logrus.Entry) *gelf.Message {
// short & full message
p := bytes.TrimSpace([]byte(entry.Message))
short := bytes.NewBuffer(p)
full := ""
if i := bytes.IndexRune(p, '\n'); i > 0 {
full = short.String()
short.Truncate(i)
}
// merge entry.Data & StaticMeta & Facility into extra
extra := map[string]interface{}{}
for k, v := range entry.Data {
extra["_"+k] = v // prefix with _ will be treated as an additional field
}
for k, v := range h.config.StaticMeta {
extra["_"+k] = v // prefix with _ will be treated as an additional field
}
extra["_facility"] = h.config.Facility
return &gelf.Message{
Version: "1.1",
Host: h.config.Hostname,
Short: short.String(),
Full: full,
TimeUnix: float64(now().UnixNano()) / 1e9,
Level: int32(entry.Level),
Extra: extra,
}
}
func (h *Hook) loopCheckNodesHealth(checkNodeAlive func(url string) uint32) {
for {
sleep(h.config.HealthCheckInterval)
// mutex protect nodes from concurrent read-write access
func() {
h.nodesMutex.RLock()
defer h.nodesMutex.RUnlock()
for _, node := range h.nodes {
node.setStatus(checkNodeAlive(node.healthCheckURL))
}
}()
}
}