-
Notifications
You must be signed in to change notification settings - Fork 0
/
tail_impl.go
83 lines (66 loc) · 1.25 KB
/
tail_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package tail
import (
"github.com/vela-ssoc/vela-kit/audit"
"sync/atomic"
)
func (t *tail) newFx(path string) *Fx {
return newFx(t.tom, path, t.handle)
}
func (t *tail) add(raw line) *tail {
add := raw.add
if add == nil {
add = t.cfg.add
}
if add == nil {
return t
}
raw.value = add(raw.value)
return t
}
func (t *tail) enc(raw line) *tail {
enc := raw.enc
if enc == nil {
enc = t.cfg.enc
}
if enc == nil {
return t
}
raw.value = enc(raw.value)
return t
}
func (t *tail) push(chunk []byte) {
if t.cfg.sdk == nil {
return
}
wn, err := t.cfg.sdk.Write(chunk)
if err != nil {
xEnv.Errorf("%s output write error %v", t.Name(), err)
return
}
atomic.AddUint64(&t.wn, uint64(wn))
}
func (t *tail) toPipe(raw line) {
//调用接口
t.cfg.pipe.Do(raw.value, t.cfg.co, func(err error) {
audit.Errorf("%s pipe call fail %v", t.Name(), err).From(t.CodeVM()).Put()
})
}
func (t *tail) handle(raw line, e error) {
if e != nil {
return
}
//pipe
t.toPipe(raw)
//限速
t.limit.wait()
//发送数据
t.queue <- raw
}
func (t *tail) output(idx int) {
for raw := range t.queue {
raw.Enc(t.cfg.enc)
raw.Add(t.cfg.add)
t.push(raw.byte())
}
audit.Errorf("%s %d output thread exit", t.Name(), idx).Log().From(t.CodeVM()).Put()
}