From 93c25a8e067374a14346ae1c6bc8975874c25524 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 28 May 2024 15:05:10 +0100 Subject: [PATCH] Add assertions for all truncateWAL and resetWAL calls --- server/raft.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/server/raft.go b/server/raft.go index 66777049da4..62995f717fb 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2603,6 +2603,12 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) { n.stepdown.push(noLeader) // We need to reset our state here as well. n.resetWAL() + assert.Unreachable( + "Failed to load last snapshot when sending snapshot to follower", + map[string]any{ + "error": err.Error(), + }, + ) return 0, err } // Go ahead and send the snapshot and peerstate here as first append entry to the catchup follower. @@ -2735,6 +2741,16 @@ func (n *raft) applyCommit(index uint64) error { } // Reset and cancel any catchup. n.resetWAL() + assert.Unreachable( + "Failed to load append entry from store", + map[string]any{ + "error": err.Error(), + "index": index, + "is_leader": n.State(), + "wal_first_seq": state.FirstSeq, + "wal_last_seq": state.LastSeq, + }, + ) n.cancelCatchup() } return errEntryLoadFailed @@ -3106,6 +3122,7 @@ func (n *raft) truncateWAL(term, index uint64) { } // Set after we know we have truncated properly. + // TODO: we probably shouldn't mess with the term here. n.term, n.pterm, n.pindex = term, term, index } @@ -3271,12 +3288,39 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { success = true } else { n.resetWAL() + assert.Unreachable( + "AppendEntry pindex mismatched, resetting WAL", + map[string]any{ + "our_pterm": n.pterm, + "our_pindex": n.pindex, + "ae_pterm": ae.pterm, + "ae_pindex": ae.pindex, + }, + ) } } else { // If terms mismatched, or we got an error loading, delete that entry and all others past it. // Make sure to cancel any catchups in progress. // Truncate will reset our pterm and pindex. Only do so if we have an entry. n.truncateWAL(ae.pterm, ae.pindex) + assert.Always( + ae.pterm >= n.pterm, + "AppendEntry pterm is greater than or equal to our own", + map[string]any{ + "our_pterm": n.pterm, + "our_pindex": n.pindex, + "ae_pterm": ae.pterm, + "ae_pindex": ae.pindex, + }, + ) + assert.Always( + ae.pindex >= n.commit, + "AppendEntry pindex is greater than or equal to our commit", + map[string]any{ + "our_commit": n.commit, + "ae_pindex": ae.pindex, + }, + ) } // Cancel regardless. n.cancelCatchup() @@ -3307,6 +3351,15 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // Make sure pterms match and we take on the leader's. // This prevents constant spinning. n.truncateWAL(ae.pterm, ae.pindex) + assert.Unreachable( + "Weird unexplainable truncateWAL was called", + map[string]any{ + "our_pterm": n.pterm, + "our_pindex": n.pindex, + "ae_pterm": ae.pterm, + "ae_pindex": ae.pindex, + }, + ) n.cancelCatchup() n.Unlock() return @@ -3483,6 +3536,13 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) { // The remote node didn't commit the append entry, it looks like // they are on a newer term than we are. Step down. n.Lock() + assert.Unreachable( + "AppendEntryResponse was rejected by leader and we reset the WAL", + map[string]any{ + "our_term": n.term, + "ar_term": ar.term, + }, + ) n.term = ar.term n.vote = noVote n.writeTermVote() @@ -3539,6 +3599,13 @@ func (n *raft) storeToWAL(ae *appendEntry) error { } // Reset and cancel any catchup. n.resetWAL() + assert.Unreachable( + "WAL stored sequence doesn't match the leader last log index", + map[string]any{ + "store_seq": seq, + "ae_pindex": ae.pindex, + }, + ) n.cancelCatchup() return errEntryStoreFailed }