Skip to content

Commit

Permalink
Merge pull request #62 from replicase/feature/adjust-pgsource-report-lsn
Browse files Browse the repository at this point in the history
feat:  add replyRequested when report LSN when needed
  • Loading branch information
benjamin99 authored Jul 29, 2024
2 parents ec7ea6d + 625840e commit 11436ca
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions pkg/source/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,20 @@ func (p *PGXSource) Capture(cp cursor.Checkpoint) (changes chan Change, err erro
}

func (p *PGXSource) fetching(ctx context.Context) (change Change, err error) {
if time.Now().After(p.nextReportTime) {
if err = p.reportLSN(ctx); err != nil {
return change, err
defer func() {
needReply := isTimeout(err)
if time.Now().After(p.nextReportTime) || needReply {
if err = p.reportLSN(ctx, needReply); err != nil {
p.log.WithFields(logrus.Fields{"Error": err}).Error("failed to report LSN")
}
p.nextReportTime = time.Now().Add(5 * time.Second)
}
p.nextReportTime = time.Now().Add(5 * time.Second)
}
}()

msg, err := p.replConn.ReceiveMessage(ctx)
if err != nil {
return change, err
}

switch msg := msg.(type) {
case *pgproto3.CopyData:
switch msg.Data[0] {
Expand Down Expand Up @@ -259,9 +262,15 @@ func (p *PGXSource) committedLSN() (lsn pglogrepl.LSN) {
return pglogrepl.LSN(atomic.LoadUint64(&p.ackLsn))
}

func (p *PGXSource) reportLSN(ctx context.Context) error {
func (p *PGXSource) reportLSN(ctx context.Context, replyRequested bool) error {
if committed := p.committedLSN(); committed != 0 {
return pglogrepl.SendStandbyStatusUpdate(ctx, p.replConn, pglogrepl.StandbyStatusUpdate{WALWritePosition: committed})
return pglogrepl.SendStandbyStatusUpdate(ctx,
p.replConn,
pglogrepl.StandbyStatusUpdate{
WALWritePosition: committed,
ReplyRequested: replyRequested,
},
)
}
return nil
}
Expand All @@ -272,7 +281,7 @@ func (p *PGXSource) cleanup() {
p.setupConn.Close(ctx)
}
if p.replConn != nil {
p.reportLSN(ctx)
p.reportLSN(ctx, false)
p.replConn.Close(ctx)
}
}

0 comments on commit 11436ca

Please sign in to comment.