forked from digitalocean/go-workers2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
111 lines (99 loc) · 1.92 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package workers
import (
"log"
"sync"
)
type worker struct {
queue string
handler JobFunc
concurrency int
runners []*taskRunner
runnersLock sync.Mutex
stop chan bool
running bool
logger *log.Logger
}
func newWorker(logger *log.Logger, queue string, concurrency int, handler JobFunc) *worker {
if concurrency <= 0 {
concurrency = 1
}
w := &worker{
queue: queue,
handler: handler,
concurrency: concurrency,
stop: make(chan bool),
logger: logger,
}
return w
}
func (w *worker) start(fetcher Fetcher) {
w.runnersLock.Lock()
if w.running {
w.runnersLock.Unlock()
return
}
w.running = true
defer func() {
w.runnersLock.Lock()
w.running = false
w.runnersLock.Unlock()
}()
var wg sync.WaitGroup
wg.Add(w.concurrency)
go fetcher.Fetch()
done := make(chan *Msg)
w.runners = make([]*taskRunner, w.concurrency)
for i := 0; i < w.concurrency; i++ {
r := newTaskRunner(w.logger, w.handler)
w.runners[i] = r
go func() {
r.work(fetcher.Messages(), done, fetcher.Ready())
wg.Done()
}()
}
exit := make(chan bool)
go func() {
wg.Wait()
close(exit)
}()
// Now that we're all set up, unlock so that stats can check.
w.runnersLock.Unlock()
for {
select {
case msg := <-done:
if msg.ack {
fetcher.Acknowledge(msg)
}
case <-w.stop:
if !fetcher.Closed() {
fetcher.Close()
// we need to relock the runners so we can shut this down
w.runnersLock.Lock()
for _, r := range w.runners {
r.quit()
}
w.runnersLock.Unlock()
}
case <-exit:
return
}
}
}
func (w *worker) quit() {
w.runnersLock.Lock()
defer w.runnersLock.Unlock()
if w.running {
w.stop <- true
}
}
func (w *worker) inProgressMessages() []*Msg {
w.runnersLock.Lock()
defer w.runnersLock.Unlock()
var res []*Msg
for _, r := range w.runners {
if m := r.inProgressMessage(); m != nil {
res = append(res, m)
}
}
return res
}