Skip to content

Commit

Permalink
make checkpointing async
Browse files Browse the repository at this point in the history
  • Loading branch information
lorenzoranucci committed Dec 12, 2022
1 parent 470e486 commit fb9f7e8
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 56 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ test:
@find . -name go.mod \
| sed 's/\/[^\/]*$$//' \
| xargs -I {} bash -c \
'echo "testing {}" && go test {}/...'
'echo "testing {}" && go test --race {}/...'
6 changes: 4 additions & 2 deletions example/tor/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"regexp"
"time"

"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -40,7 +41,6 @@ var runCmd = &cobra.Command{

stateHandler := getRedisStateHandler()
handler, err := run.NewEventHandler(
stateHandler,
ed,
viper.GetString("dbAggregateIDColumnName"),
viper.GetString("dbAggregateTypeColumnName"),
Expand All @@ -51,7 +51,7 @@ var runCmd = &cobra.Command{
return err
}

runner := run.NewRunner(c, handler, stateHandler)
runner := run.NewRunner(c, handler, stateHandler, time.Second*5)

return runner.Run()
},
Expand Down Expand Up @@ -114,6 +114,8 @@ func getCanalConfig() *canal.Config {
cfg.Password = viper.GetString("dbPassword")
cfg.Dump.ExecutionPath = ""
cfg.IncludeTableRegex = []string{fmt.Sprintf("^%s$", viper.Get("dbOutboxTableRef"))}
cfg.MaxReconnectAttempts = 10
cfg.ReadTimeout = time.Second * 10

return cfg
}
2 changes: 1 addition & 1 deletion example/tor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/go-mysql-org/go-mysql v1.6.0
github.com/go-redis/redis/v8 v8.11.5
github.com/lorenzoranucci/tor/adapters/kafka v0.1.0
github.com/lorenzoranucci/tor/adapters/redis v0.1.0
github.com/lorenzoranucci/tor/adapters/redis v0.2.0
github.com/lorenzoranucci/tor/router v0.2.0
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.6.1
Expand Down
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/lorenzoranucci/tor/adapters/redis v0.2.0/go.mod h1:U6idye39F8NfHtPeBv0MYD690+gnN75z9DDOe7XDNgY=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
18 changes: 3 additions & 15 deletions router/pkg/run/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type EventDispatcher interface {
}

func NewEventHandler(
stateHandler StateHandler,
eventDispatcher EventDispatcher,
aggregateIDColumnName string,
aggregateTypeColumnName string,
Expand Down Expand Up @@ -60,7 +59,6 @@ func NewEventHandler(
}

return &EventHandler{
stateHandler: stateHandler,
eventMapper: &EventMapper{
aggregateIDColumnName: actualAggregateIDColumnName,
aggregateTypeColumnName: actualAggregateTypeColumnName,
Expand All @@ -74,9 +72,9 @@ func NewEventHandler(
type EventHandler struct {
canal.DummyEventHandler

stateHandler StateHandler
eventMapper *EventMapper
eventDispatcher EventDispatcher
positionChan chan mysql.Position
}

func (h *EventHandler) OnRow(e *canal.RowsEvent) error {
Expand Down Expand Up @@ -105,20 +103,10 @@ func (h *EventHandler) OnRow(e *canal.RowsEvent) error {
}

func (h *EventHandler) OnPosSynced(p mysql.Position, g mysql.GTIDSet, f bool) error {
return h.setLastPosition(p)
h.positionChan <- p
return nil
}

func (h *EventHandler) String() string {
return "EventHandler"
}

func (h *EventHandler) setLastPosition(p mysql.Position) error {
err := h.stateHandler.SetLastPosition(p)
if err != nil {
return err
}
logrus.WithField("lastPosition", p).
Debug("last position set")

return nil
}
50 changes: 18 additions & 32 deletions router/pkg/run/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/go-mysql-org/go-mysql/schema"
"github.com/lorenzoranucci/tor/router/pkg/run"
Expand All @@ -16,8 +15,7 @@ import (

func TestEventHandler_OnRow_HappyPaths(t *testing.T) {
type fields struct {
stateHandler *StateHandlerStub
eventDispatcher *EventDispatcherMock
eventDispatcher *eventDispatcherMock
aggregateIdColumnName string
aggregateTypeColumnName string
payloadColumnName string
Expand Down Expand Up @@ -64,7 +62,7 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
},
wantDispatches: []dispatch{
{
Expand Down Expand Up @@ -104,7 +102,7 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
},
wantDispatches: []dispatch{
{
Expand Down Expand Up @@ -148,7 +146,7 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
aggregateIdColumnName: "aggregateId",
aggregateTypeColumnName: "aggregateType",
payloadColumnName: "payload_",
Expand Down Expand Up @@ -196,7 +194,7 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
},
wantDispatches: []dispatch{
{
Expand Down Expand Up @@ -245,7 +243,7 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
aggregateTypeRegexp: regexp.MustCompile("(?i)^order$"),
},
wantDispatches: []dispatch{
Expand All @@ -260,7 +258,6 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
h, err := run.NewEventHandler(
tt.fields.stateHandler,
tt.fields.eventDispatcher,
tt.fields.aggregateIdColumnName,
tt.fields.aggregateTypeColumnName,
Expand All @@ -282,7 +279,7 @@ func TestEventHandler_OnRow_HappyPaths(t *testing.T) {

func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) {
type fields struct {
eventDispatcher *EventDispatcherMock
eventDispatcher *eventDispatcherMock
aggregateIdColumnName string
aggregateTypeColumnName string
payloadColumnName string
Expand Down Expand Up @@ -330,7 +327,7 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
},
},
{
Expand Down Expand Up @@ -364,7 +361,7 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{
eventDispatcher: &eventDispatcherMock{
err: errors.New(""),
},
},
Expand Down Expand Up @@ -406,7 +403,7 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
},
wantErr: true,
},
Expand Down Expand Up @@ -441,7 +438,7 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
},
wantErr: true,
},
Expand Down Expand Up @@ -476,7 +473,7 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
},
wantErr: true,
},
Expand Down Expand Up @@ -511,7 +508,7 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
},
wantErr: true,
},
Expand Down Expand Up @@ -546,7 +543,7 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
},
wantErr: true,
},
Expand Down Expand Up @@ -581,7 +578,7 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
},
wantErr: true,
},
Expand Down Expand Up @@ -616,7 +613,7 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) {
},
},
fields: fields{
eventDispatcher: &EventDispatcherMock{},
eventDispatcher: &eventDispatcherMock{},
},
wantErr: true,
},
Expand All @@ -625,7 +622,6 @@ func TestEventHandler_OnRow_UnhappyPaths(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
h, err := run.NewEventHandler(
&StateHandlerStub{},
tt.fields.eventDispatcher,
tt.fields.aggregateIdColumnName,
tt.fields.aggregateTypeColumnName,
Expand Down Expand Up @@ -656,26 +652,16 @@ type dispatch struct {
event []byte
}

type EventDispatcherMock struct {
type eventDispatcherMock struct {
dispatches []dispatch
err error
}

func (e *EventDispatcherMock) Dispatch(routingKey string, event []byte) error {
func (e *eventDispatcherMock) Dispatch(routingKey string, event []byte) error {
e.dispatches = append(e.dispatches, dispatch{
routingKey: routingKey,
event: event,
})

return e.err
}

type StateHandlerStub struct{}

func (s *StateHandlerStub) GetLastPosition() (mysql.Position, error) {
return mysql.Position{}, nil
}

func (s *StateHandlerStub) SetLastPosition(_ mysql.Position) error {
return nil
}
69 changes: 64 additions & 5 deletions router/pkg/run/run.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,39 @@
package run

import (
"context"
"time"

"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/sirupsen/logrus"
)

func NewRunner(canal *canal.Canal, handler canal.EventHandler, stateHandler StateHandler) *Runner {
func NewRunner(
canal Canal,
handler *EventHandler,
stateHandler StateHandler,
stateUpdateFrequency time.Duration,
) *Runner {
p := make(chan mysql.Position)
handler.positionChan = p

canal.SetEventHandler(handler)

return &Runner{canal: canal, stateHandler: stateHandler}
return &Runner{canal: canal, stateHandler: stateHandler, positionChan: p, stateUpdateFrequency: stateUpdateFrequency}
}

type Runner struct {
canal *canal.Canal
stateHandler StateHandler
canal Canal
stateHandler StateHandler
positionChan chan mysql.Position
stateUpdateFrequency time.Duration
}

type Canal interface {
RunFrom(mysql.Position) error
SetEventHandler(handler canal.EventHandler)
Close()
}

func (r *Runner) Run() error {
Expand All @@ -21,5 +42,43 @@ func (r *Runner) Run() error {
return err
}

return r.canal.RunFrom(lastPosition)
errCh := make(chan error, 2)

go func() {
errCh <- r.canal.RunFrom(lastPosition)
}()

ctx, cf := context.WithCancel(context.Background())
ticker := time.NewTicker(r.stateUpdateFrequency)
go func() {
for {
select {
case lastPosition = <-r.positionChan:
case <-ticker.C:
err := r.setLastPosition(lastPosition)
if err != nil {
errCh <- err
}
case <-ctx.Done():
_ = r.setLastPosition(lastPosition)
return
}
}
}()

err = <-errCh
r.canal.Close()
cf()
return err
}

func (r *Runner) setLastPosition(p mysql.Position) error {
err := r.stateHandler.SetLastPosition(p)
if err != nil {
return err
}
logrus.WithField("lastPosition", p).
Debug("last position set")

return nil
}
Loading

0 comments on commit fb9f7e8

Please sign in to comment.