-
Notifications
You must be signed in to change notification settings - Fork 0
/
gc.go
169 lines (137 loc) · 3.5 KB
/
gc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package main
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/sirupsen/logrus"
)
const (
gcInterval = 1 * time.Hour
// time since last update we delete a session
sessMaxAge = 12 * time.Hour
// age at which we purge a chunk
chunkMaxAge = 24 * time.Hour
// max number of expired chunks we act on at once
expiredChunksMax = 1000
)
// garbageCollector periodically cleans up old data
type garbageCollector struct {
l logrus.FieldLogger
db *sql.DB
dcs *diskChunkStore
ticker *time.Ticker
stopC chan struct{}
}
func newGarbageCollector(l logrus.FieldLogger, db *sql.DB, dcs *diskChunkStore) *garbageCollector {
return &garbageCollector{
l: l,
db: db,
dcs: dcs,
stopC: make(chan struct{}),
}
}
func (g *garbageCollector) Run() error {
if err := g.collect(); err != nil {
return err
}
g.ticker = time.NewTicker(gcInterval)
for {
select {
case <-g.ticker.C:
if err := g.collect(); err != nil {
return fmt.Errorf("running gc: %v", err)
}
case <-g.stopC:
return nil
}
}
}
func (g *garbageCollector) Interrupt(_ error) {
g.stopC <- struct{}{}
}
func (g *garbageCollector) collect() error {
ctx := context.Background()
if err := g.gcSessions(ctx); err != nil {
return err
}
ecs, err := g.expiredChunks(ctx)
if err != nil {
return err
}
if len(ecs) < 1 {
return nil
}
if err := g.gcChunks(ctx, ecs); err != nil {
return err
}
return nil
}
func (g *garbageCollector) gcSessions(ctx context.Context) error {
g.l.Debug("start gc sessions")
res, err := g.db.ExecContext(ctx,
`delete from sessions where updated_at < $1`,
time.Now().Add(-sessMaxAge).UTC())
if err != nil {
return fmt.Errorf("gc sessions: %v", err)
}
ri, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("getting session delete rows affected: %d", err)
}
g.l.Debugf("gc'd %d sessions", ri)
return nil
}
type delRow struct {
ID int
StreamID string
ChunkID string
}
func (g *garbageCollector) expiredChunks(ctx context.Context) ([]delRow, error) {
g.l.Debug("check expired chunks")
rows, err := g.db.QueryContext(ctx,
`select id, stream_id, chunk_id from chunks where fetched_at < $1 limit $2`,
time.Now().Add(-chunkMaxAge).UTC(), expiredChunksMax,
)
if err != nil {
return nil, fmt.Errorf("fetching expired chunks: %v", err)
}
defer rows.Close()
var delrows []delRow
for rows.Next() {
dr := delRow{}
if err := rows.Scan(&dr.ID, &dr.StreamID, &dr.ChunkID); err != nil {
return nil, fmt.Errorf("scanning row: %v", err)
}
delrows = append(delrows, dr)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("in result iteration: %v", err)
}
g.l.Debugf("found %d expired chunks (max %d)", len(delrows), expiredChunksMax)
return delrows, nil
}
func (g *garbageCollector) gcChunks(ctx context.Context, delrows []delRow) error {
g.l.Debug("start gc chunks")
for _, dr := range delrows {
if err := execTx(ctx, g.db, func(ctx context.Context, tx *sql.Tx) error {
res, err := tx.ExecContext(ctx, `delete from chunks where id = $1`, dr.ID)
if err != nil {
return err
}
if err := g.dcs.DeleteChunk(ctx, dr.StreamID, dr.ChunkID); err != nil {
return fmt.Errorf("deleting chunk %s/%s: %v", dr.StreamID, dr.ChunkID, err)
}
ri, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("getting delete chunk rows affected: %d", err)
}
g.l.Debugf("deleting chunk %s/%s: %d rows affected", dr.StreamID, dr.ChunkID, ri)
return nil
}); err != nil {
return err
}
}
g.l.Debug("end gc chunks")
return nil
}