Skip to content

Commit

Permalink
Add assertions for all truncateWAL and resetWAL calls
Browse files Browse the repository at this point in the history
  • Loading branch information
neilalexander committed May 28, 2024
1 parent f4d83c8 commit 93c25a8
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2603,6 +2603,12 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) {
n.stepdown.push(noLeader)
// We need to reset our state here as well.
n.resetWAL()
assert.Unreachable(
"Failed to load last snapshot when sending snapshot to follower",
map[string]any{
"error": err.Error(),
},
)
return 0, err
}
// Go ahead and send the snapshot and peerstate here as first append entry to the catchup follower.
Expand Down Expand Up @@ -2735,6 +2741,16 @@ func (n *raft) applyCommit(index uint64) error {
}
// Reset and cancel any catchup.
n.resetWAL()
assert.Unreachable(
"Failed to load append entry from store",
map[string]any{
"error": err.Error(),
"index": index,
"is_leader": n.State(),
"wal_first_seq": state.FirstSeq,
"wal_last_seq": state.LastSeq,
},
)
n.cancelCatchup()
}
return errEntryLoadFailed
Expand Down Expand Up @@ -3106,6 +3122,7 @@ func (n *raft) truncateWAL(term, index uint64) {
}

// Set after we know we have truncated properly.
// TODO: we probably shouldn't mess with the term here.
n.term, n.pterm, n.pindex = term, term, index
}

Expand Down Expand Up @@ -3271,12 +3288,39 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
success = true
} else {
n.resetWAL()
assert.Unreachable(
"AppendEntry pindex mismatched, resetting WAL",
map[string]any{
"our_pterm": n.pterm,
"our_pindex": n.pindex,
"ae_pterm": ae.pterm,
"ae_pindex": ae.pindex,
},
)
}
} else {
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
// Make sure to cancel any catchups in progress.
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
n.truncateWAL(ae.pterm, ae.pindex)
assert.Always(
ae.pterm >= n.pterm,
"AppendEntry pterm is greater than or equal to our own",
map[string]any{
"our_pterm": n.pterm,
"our_pindex": n.pindex,
"ae_pterm": ae.pterm,
"ae_pindex": ae.pindex,
},
)
assert.Always(
ae.pindex >= n.commit,
"AppendEntry pindex is greater than or equal to our commit",
map[string]any{
"our_commit": n.commit,
"ae_pindex": ae.pindex,
},
)
}
// Cancel regardless.
n.cancelCatchup()
Expand Down Expand Up @@ -3307,6 +3351,15 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Make sure pterms match and we take on the leader's.
// This prevents constant spinning.
n.truncateWAL(ae.pterm, ae.pindex)
assert.Unreachable(
"Weird unexplainable truncateWAL was called",
map[string]any{
"our_pterm": n.pterm,
"our_pindex": n.pindex,
"ae_pterm": ae.pterm,
"ae_pindex": ae.pindex,
},
)
n.cancelCatchup()
n.Unlock()
return
Expand Down Expand Up @@ -3483,6 +3536,13 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) {
// The remote node didn't commit the append entry, it looks like
// they are on a newer term than we are. Step down.
n.Lock()
assert.Unreachable(
"AppendEntryResponse was rejected by leader and we reset the WAL",
map[string]any{
"our_term": n.term,
"ar_term": ar.term,
},
)
n.term = ar.term
n.vote = noVote
n.writeTermVote()
Expand Down Expand Up @@ -3539,6 +3599,13 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
}
// Reset and cancel any catchup.
n.resetWAL()
assert.Unreachable(
"WAL stored sequence doesn't match the leader last log index",
map[string]any{
"store_seq": seq,
"ae_pindex": ae.pindex,
},
)
n.cancelCatchup()
return errEntryStoreFailed
}
Expand Down

0 comments on commit 93c25a8

Please sign in to comment.