Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
Merge pull request #223 from MathieuBordere/high-cpu-load
Browse files Browse the repository at this point in the history
Don't send Snapshots to offline nodes
  • Loading branch information
stgraber authored Jul 28, 2021
2 parents c15a5d2 + 2590072 commit c772738
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
CC: ${{ matrix.compiler }}
run: |
./test/lib/fs.sh setup
make check CFLAGS=-O0 $(./test/lib/fs.sh detect) || (cat ./test-suite.log && false)
make check $(./test/lib/fs.sh detect) || (cat ./test-suite.log && false)
./test/lib/fs.sh teardown
- name: Coverage
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ script:
- amalgamate.py --config=amalgamation.json --source=$(pwd)
- $CC raft.c -c -D_GNU_SOURCE -DHAVE_LINUX_AIO_ABI_H -Wall -Wextra -Wpedantic -fpic
- ./test/lib/fs.sh setup
- make check CFLAGS=-O0 $(./test/lib/fs.sh detect) || (cat ./test-suite.log && false)
- make check $(./test/lib/fs.sh detect) || (cat ./test-suite.log && false)
- ./test/lib/fs.sh teardown
5 changes: 5 additions & 0 deletions src/progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ void progressMarkRecentRecv(struct raft *r, const unsigned i)
r->leader_state.progress[i].recent_recv = true;
}

bool progressGetRecentRecv(const struct raft *r, const unsigned i)
{
return r->leader_state.progress[i].recent_recv;
}

void progressToSnapshot(struct raft *r, unsigned i)
{
struct raft_progress *p = &r->leader_state.progress[i];
Expand Down
3 changes: 3 additions & 0 deletions src/progress.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ bool progressResetRecentRecv(struct raft *r, unsigned i);
* To be called whenever we receive an AppendEntries RPC result */
void progressMarkRecentRecv(struct raft *r, unsigned i);

/* Return the value of the recent_recv flag. */
bool progressGetRecentRecv(const struct raft *r, unsigned i);

/* Convert to the i'th server to snapshot mode. */
void progressToSnapshot(struct raft *r, unsigned i);

Expand Down
10 changes: 9 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,15 @@ int replicationProgress(struct raft *r, unsigned i)
return sendAppendEntries(r, i, prev_index, prev_term);

send_snapshot:
return sendSnapshot(r, i);
if (progressGetRecentRecv(r, i)) {
/* Only send a snapshot when we have heard from the server */
return sendSnapshot(r, i);
} else {
/* Send empty AppendEntries RPC when we haven't heard from the server */
prev_index = logLastIndex(&r->log);
prev_term = logLastTerm(&r->log);
return sendAppendEntries(r, i, prev_index, prev_term);
}
}

/* Possibly trigger I/O requests for newly appended log entries or heartbeats.
Expand Down
205 changes: 167 additions & 38 deletions test/integration/test_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ TEST(snapshot, installOne, setUp, tearDown, 0, NULL)
CLUSTER_DESATURATE_BOTHWAYS(0, 2);
CLUSTER_STEP_UNTIL_APPLIED(2, 4, 5000);

/* Check that the leader has sent a snapshot */
munit_assert_int(CLUSTER_N_SEND(0, RAFT_IO_INSTALL_SNAPSHOT), ==, 1);
munit_assert_int(CLUSTER_N_RECV(2, RAFT_IO_INSTALL_SNAPSHOT), ==, 1);
return MUNIT_OK;
}

Expand All @@ -103,29 +106,147 @@ TEST(snapshot, installOneTimeOut, setUp, tearDown, 0, NULL)
SET_SNAPSHOT_THRESHOLD(3);
SET_SNAPSHOT_TRAILING(1);
SET_SNAPSHOT_TIMEOUT(200);
CLUSTER_SATURATE_BOTHWAYS(0, 2);

/* Apply a few of entries, to force a snapshot to be taken. */
/* Apply a few of entries, to force a snapshot to be taken. Drop all network
* traffic between servers 0 and 2 in order for AppendEntries RPCs to not be
* replicated */
CLUSTER_SATURATE_BOTHWAYS(0, 2);
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;

/* Reconnect the leader, but follower cannot reply */
CLUSTER_DESATURATE(0, 2);
/* Reconnect both servers and set a high disk latency on server 2 so that
* the InstallSnapshot RPC will time out */
CLUSTER_SET_DISK_LATENCY(2, 300);
CLUSTER_DESATURATE_BOTHWAYS(0, 2);

/* InstallSnaphot RPC times out */
CLUSTER_STEP_UNTIL_ELAPSED(400);
/* Wait a while and check that the leader has sent a snapshot */
CLUSTER_STEP_UNTIL_ELAPSED(300);
munit_assert_int(CLUSTER_N_SEND(0, RAFT_IO_INSTALL_SNAPSHOT), ==, 1);
munit_assert_int(CLUSTER_N_RECV(2, RAFT_IO_INSTALL_SNAPSHOT), ==, 1);

/* Reconnect the follower */
CLUSTER_DESATURATE(2, 0);
/* Wait for the snapshot to be installed */
CLUSTER_STEP_UNTIL_APPLIED(2, 4, 5000);

/* Assert that the leader has retried the InstallSnapshot RPC */
munit_assert_int(CLUSTER_N_SEND(0, RAFT_IO_INSTALL_SNAPSHOT), ==, 2);
munit_assert_int(CLUSTER_N_RECV(2, RAFT_IO_INSTALL_SNAPSHOT), ==, 2);

return MUNIT_OK;
}

/* Install snapshot to an offline node */
TEST(snapshot, installOneDisconnectedFromBeginningReconnects, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
(void)params;

/* Set very low threshold and trailing entries number */
SET_SNAPSHOT_THRESHOLD(3);
SET_SNAPSHOT_TRAILING(1);
SET_SNAPSHOT_TIMEOUT(200);

/* Apply a few of entries, to force a snapshot to be taken. Disconnect
* servers 0 and 2 so that the network calls return failure status */
CLUSTER_DISCONNECT(0, 2);
CLUSTER_DISCONNECT(2, 0);
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;

/* Wait a while so leader detects offline node */
CLUSTER_STEP_UNTIL_ELAPSED(2000);

/* Assert that the leader doesn't try sending a snapshot to an offline node */
munit_assert_int(CLUSTER_N_SEND(0, RAFT_IO_INSTALL_SNAPSHOT), ==, 0);
munit_assert_int(CLUSTER_N_RECV(2, RAFT_IO_INSTALL_SNAPSHOT), ==, 0);

CLUSTER_RECONNECT(0, 2);
CLUSTER_RECONNECT(2, 0);
/* Wait for the snapshot to be installed */
CLUSTER_STEP_UNTIL_APPLIED(2, 4, 5000);

/* Assert that the leader has sent an InstallSnapshot RPC */
munit_assert_int(CLUSTER_N_SEND(0, RAFT_IO_INSTALL_SNAPSHOT), ==, 1);
munit_assert_int(CLUSTER_N_RECV(2, RAFT_IO_INSTALL_SNAPSHOT), ==, 1);

return MUNIT_OK;
}

/* Install snapshot to an offline node that went down during operation */
TEST(snapshot, installOneDisconnectedDuringOperationReconnects, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
(void)params;

/* Set very low threshold and trailing entries number */
SET_SNAPSHOT_THRESHOLD(3);
SET_SNAPSHOT_TRAILING(1);
SET_SNAPSHOT_TIMEOUT(200);

/* Apply a few of entries */
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;

/* Wait for follower to catch up*/
CLUSTER_STEP_UNTIL_APPLIED(2, 4, 5000);
/* Assert that the leader hasn't sent an InstallSnapshot RPC */
munit_assert_int(CLUSTER_N_SEND(0, RAFT_IO_INSTALL_SNAPSHOT), ==, 0);

CLUSTER_DISCONNECT(0, 2);
CLUSTER_DISCONNECT(2, 0);

/* Wait a while so leader detects offline node */
CLUSTER_STEP_UNTIL_ELAPSED(2000);

/* Apply a few more entries */
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;

/* Assert that the leader doesn't try sending snapshot to an offline node */
munit_assert_int(CLUSTER_N_SEND(0, RAFT_IO_INSTALL_SNAPSHOT), ==, 0);
munit_assert_int(CLUSTER_N_RECV(2, RAFT_IO_INSTALL_SNAPSHOT), ==, 0);

CLUSTER_RECONNECT(0, 2);
CLUSTER_RECONNECT(2, 0);
CLUSTER_STEP_UNTIL_APPLIED(2, 7, 5000);

/* Assert that the leader has tried sending an InstallSnapshot RPC */
munit_assert_int(CLUSTER_N_SEND(0, RAFT_IO_INSTALL_SNAPSHOT), ==, 1);
munit_assert_int(CLUSTER_N_RECV(2, RAFT_IO_INSTALL_SNAPSHOT), ==, 1);

return MUNIT_OK;
}

/* No snapshots sent to killed nodes */
TEST(snapshot, noSnapshotInstallToKilled, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
(void)params;

/* Set very low threshold and trailing entries number */
SET_SNAPSHOT_THRESHOLD(3);
SET_SNAPSHOT_TRAILING(1);
SET_SNAPSHOT_TIMEOUT(200);

/* Kill a server */
CLUSTER_KILL(2);

/* Apply a few of entries */
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;

/* Wait a while */
CLUSTER_STEP_UNTIL_ELAPSED(4000);

/* Assert that the leader hasn't sent an InstallSnapshot RPC */
munit_assert_int(CLUSTER_N_SEND(0, RAFT_IO_INSTALL_SNAPSHOT), ==, 0);
return MUNIT_OK;
}

/* Install snapshot times out and leader retries, afterwards AppendEntries resume */
TEST(snapshot, installOneTimeOutAppendAfter, setUp, tearDown, 0, NULL)
{
Expand All @@ -136,21 +257,22 @@ TEST(snapshot, installOneTimeOutAppendAfter, setUp, tearDown, 0, NULL)
SET_SNAPSHOT_THRESHOLD(3);
SET_SNAPSHOT_TRAILING(1);
SET_SNAPSHOT_TIMEOUT(200);
CLUSTER_SATURATE_BOTHWAYS(0, 2);

/* Apply a few of entries, to force a snapshot to be taken. */
/* Apply a few of entries, to force a snapshot to be taken. Drop all network
* traffic between servers 0 and 2 in order for AppendEntries RPCs to not be
* replicated */
CLUSTER_SATURATE_BOTHWAYS(0, 2);
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;

/* Reconnect the leader, but follower cannot reply */
CLUSTER_DESATURATE(0, 2);

/* InstallSnaphot RPC times out */
CLUSTER_STEP_UNTIL_ELAPSED(400);
/* Reconnect both servers and set a high disk latency on server 2 so that
* the InstallSnapshot RPC will time out */
CLUSTER_SET_DISK_LATENCY(2, 300);
CLUSTER_DESATURATE_BOTHWAYS(0, 2);

/* Reconnect the follower */
CLUSTER_DESATURATE(2, 0);
/* Wait for the snapshot to be installed */
CLUSTER_STEP_UNTIL_APPLIED(2, 4, 5000);

/* Append a few entries and check if they are replicated */
CLUSTER_MAKE_PROGRESS;
Expand All @@ -172,29 +294,33 @@ TEST(snapshot, installMultipleTimeOut, setUp, tearDown, 0, NULL)
SET_SNAPSHOT_THRESHOLD(3);
SET_SNAPSHOT_TRAILING(1);
SET_SNAPSHOT_TIMEOUT(200);
CLUSTER_SATURATE_BOTHWAYS(0, 2);

/* Apply a few of entries, to force a snapshot to be taken. */
/* Apply a few of entries, to force a snapshot to be taken. Drop all network
* traffic between servers 0 and 2 in order for AppendEntries RPCs to not be
* replicated */
CLUSTER_SATURATE_BOTHWAYS(0, 2);
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;

/* Reconnect the leader, but follower cannot reply */
CLUSTER_DESATURATE(0, 2);
/* Reconnect both servers and set a high disk latency on server 2 so that
* the InstallSnapshot RPC will time out */
CLUSTER_SET_DISK_LATENCY(2, 300);
CLUSTER_DESATURATE_BOTHWAYS(0, 2);

/* InstallSnaphot RPC times out */
/* Step until the snapshot times out */
CLUSTER_STEP_UNTIL_ELAPSED(400);

/* Apply another few of entries, to force a new snapshot to be taken. */
/* Apply another few of entries, to force a new snapshot to be taken. Drop
* all traffic between servers 0 and 2 in order for AppendEntries RPCs to not be
* replicated */
CLUSTER_SATURATE_BOTHWAYS(0, 2);
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;

/* InstallSnaphot RPC times out */
CLUSTER_STEP_UNTIL_ELAPSED(400);

/* Reconnect the follower */
CLUSTER_DESATURATE(2, 0);
CLUSTER_DESATURATE_BOTHWAYS(0, 2);
CLUSTER_STEP_UNTIL_APPLIED(2, 7, 5000);

/* Assert that the leader has sent multiple InstallSnapshot RPCs */
Expand All @@ -214,30 +340,33 @@ TEST(snapshot, installMultipleTimeOutAppendAfter, setUp, tearDown, 0, NULL)
SET_SNAPSHOT_THRESHOLD(3);
SET_SNAPSHOT_TRAILING(1);
SET_SNAPSHOT_TIMEOUT(200);
CLUSTER_SATURATE_BOTHWAYS(0, 2);

/* Apply a few of entries, to force a snapshot to be taken. */
/* Apply a few of entries, to force a snapshot to be taken. Drop all network
* traffic between servers 0 and 2 in order for AppendEntries RPCs to not be
* replicated */
CLUSTER_SATURATE_BOTHWAYS(0, 2);
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;

/* Reconnect the leader, but follower cannot reply */
CLUSTER_DESATURATE(0, 2);
/* Reconnect both servers and set a high disk latency on server 2 so that
* the InstallSnapshot RPC will time out */
CLUSTER_SET_DISK_LATENCY(2, 300);
CLUSTER_DESATURATE_BOTHWAYS(0, 2);

/* InstallSnaphot RPC times out */
/* Step until the snapshot times out */
CLUSTER_STEP_UNTIL_ELAPSED(400);

/* Apply another few of entries, to force a new snapshot to be taken. */
/* Apply another few of entries, to force a new snapshot to be taken. Drop
* all traffic between servers 0 and 2 in order for AppendEntries RPCs to not be
* replicated */
CLUSTER_SATURATE_BOTHWAYS(0, 2);
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;

/* InstallSnaphot RPC times out */
CLUSTER_STEP_UNTIL_ELAPSED(400);

/* Reconnect the follower */
CLUSTER_DESATURATE(2, 0);

CLUSTER_DESATURATE_BOTHWAYS(0, 2);
/* Append a few entries and make sure the follower catches up */
CLUSTER_MAKE_PROGRESS;
CLUSTER_MAKE_PROGRESS;
Expand Down

0 comments on commit c772738

Please sign in to comment.