Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fdbserver] Gray failure and simulator improvements related to remote processes #11717

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
6 changes: 5 additions & 1 deletion fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,11 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( CC_ENABLE_ENTIRE_SATELLITE_MONITORING, false );
init( CC_SATELLITE_DEGRADATION_MIN_COMPLAINER, 3 );
init( CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER, 3 );
init( CC_ENABLE_REMOTE_LOG_ROUTER_MONITORING, true );
init( CC_ENABLE_REMOTE_LOG_ROUTER_DEGRADATION_MONITORING, false);
init( CC_ENABLE_REMOTE_LOG_ROUTER_MONITORING, true);
init( CC_ENABLE_REMOTE_TLOG_DEGRADATION_MONITORING, false); if (isSimulated && deterministicRandom()->coinflip()) CC_ENABLE_REMOTE_TLOG_DEGRADATION_MONITORING = true;
init( CC_ENABLE_REMOTE_TLOG_DISCONNECT_MONITORING, false); if (isSimulated && deterministicRandom()->coinflip()) CC_ENABLE_REMOTE_TLOG_DISCONNECT_MONITORING = true;
init( CC_ONLY_CONSIDER_INTRA_DC_LATENCY, false); if (isSimulated && deterministicRandom()->coinflip()) CC_ONLY_CONSIDER_INTRA_DC_LATENCY = true;
init( CC_THROTTLE_SINGLETON_RERECRUIT_INTERVAL, 0.5 );

init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
Expand Down
19 changes: 17 additions & 2 deletions fdbclient/include/fdbclient/ServerKnobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,23 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
// be determined as degraded worker.
int CC_SATELLITE_DEGRADATION_MIN_BAD_SERVER; // The minimum amount of degraded server in satellite DC to be
// determined as degraded satellite.
bool CC_ENABLE_REMOTE_LOG_ROUTER_MONITORING; // When enabled, gray failure tries to detect whether the remote log
// router is degraded and may use trigger recovery to recover from it.
bool CC_ENABLE_REMOTE_LOG_ROUTER_DEGRADATION_MONITORING; // When enabled, gray failure tries to detect whether
// remote log routers are experiencing degradation
// (latency) with their peers. Gray failure may trigger
// recovery based on this.
bool CC_ENABLE_REMOTE_LOG_ROUTER_MONITORING; // When enabled, gray failure tries to detect whether
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for not saying DISCONNECT in the name is to keep backwards compatibility with the previous knob.

// remote log routers are disconnected from their peers. Gray failure
// may trigger recovery based on this.
bool CC_ENABLE_REMOTE_TLOG_DEGRADATION_MONITORING; // When enabled, gray failure tries to detect whether remote
// tlogs are experiencing degradation (latency) with their peers.
// Gray failure may trigger recovery based on this.
bool CC_ENABLE_REMOTE_TLOG_DISCONNECT_MONITORING; // When enabled, gray failure tries to detect whether remote
// tlogs are disconnected from their peers. Gray failure may
// trigger recovery based on this.
bool CC_ONLY_CONSIDER_INTRA_DC_LATENCY; // When enabled, gray failure only considers intra-DC signal for latency
// degradations. For remote processes, this knob is strongly advised to be
// turned on, because inter-DC latency signal is not reliable and it's
// challenging to pick a good latency threshold.
double CC_THROTTLE_SINGLETON_RERECRUIT_INTERVAL; // The interval to prevent re-recruiting the same singleton if a
// recruiting fight between two cluster controllers occurs.

Expand Down
118 changes: 94 additions & 24 deletions fdbserver/ClusterController.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "fdbclient/DatabaseContext.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbclient/EncryptKeyProxyInterface.h"
#include "fdbrpc/Locality.h"
#include "fdbserver/BlobGranuleServerCommon.actor.h"
#include "fdbserver/BlobMigratorInterface.h"
#include "fdbserver/Knobs.h"
Expand Down Expand Up @@ -94,12 +95,19 @@ ACTOR Future<Optional<Value>> getPreviousCoordinators(ClusterControllerData* sel
}
}

bool ClusterControllerData::processesInSameDC(const NetworkAddress& addr1, const NetworkAddress& addr2) const {
return this->addr_locality.contains(addr1) && this->addr_locality.contains(addr2) &&
this->addr_locality.at(addr1).dcId().present() && this->addr_locality.at(addr2).dcId().present() &&
this->addr_locality.at(addr1).dcId().get() == this->addr_locality.at(addr2).dcId().get();
}

bool ClusterControllerData::transactionSystemContainsDegradedServers() {
const ServerDBInfo& dbi = db.serverInfo->get();
const Reference<ClusterRecoveryData> recoveryData = db.recoveryData;
auto transactionWorkerInList = [&dbi, &recoveryData](const std::unordered_set<NetworkAddress>& serverList,
bool skipSatellite,
bool skipRemote) -> bool {
bool skipRemoteTLog,
bool skipRemoteLogRouter) -> bool {
for (const auto& server : serverList) {
if (dbi.master.addresses().contains(server)) {
return true;
Expand All @@ -115,15 +123,19 @@ bool ClusterControllerData::transactionSystemContainsDegradedServers() {
continue;
}

if (skipRemote && !logSet.isLocal) {
continue;
}

if (!logSet.isLocal) {
// Only check log routers in the remote region.
for (const auto& logRouter : logSet.logRouters) {
if (logRouter.present() && logRouter.interf().addresses().contains(server)) {
return true;
if (!skipRemoteTLog) {
for (const auto& tlog : logSet.tLogs) {
if (tlog.present() && tlog.interf().addresses().contains(server)) {
return true;
}
}
}
if (!skipRemoteLogRouter) {
for (const auto& logRouter : logSet.logRouters) {
if (logRouter.present() && logRouter.interf().addresses().contains(server)) {
return true;
}
}
}
} else {
Expand Down Expand Up @@ -176,13 +188,23 @@ bool ClusterControllerData::transactionSystemContainsDegradedServers() {
return false;
};

// Check if transaction system contains degraded/disconnected servers. For satellite and remote regions, we only
// Check if transaction system contains degraded/disconnected servers. For satellite, we only
// check for disconnection since the latency between prmary and satellite is across WAN and may not be very
// stable.
return transactionWorkerInList(degradationInfo.degradedServers, /*skipSatellite=*/true, /*skipRemote=*/true) ||
// TODO: Consider adding satellite latency degradation check and rely on
// SERVER_KNOBS->CC_ONLY_CONSIDER_INTRA_DC_LATENCY for accurate health signal
return transactionWorkerInList(degradationInfo.degradedServers,
/*skipSatellite=*/true,
/*skipRemoteTLog=*/
!(SERVER_KNOBS->CC_ONLY_CONSIDER_INTRA_DC_LATENCY &&
SERVER_KNOBS->CC_ENABLE_REMOTE_TLOG_DEGRADATION_MONITORING),
/*skipRemoteLogRouter*/
!(SERVER_KNOBS->CC_ONLY_CONSIDER_INTRA_DC_LATENCY &&
SERVER_KNOBS->CC_ENABLE_REMOTE_LOG_ROUTER_DEGRADATION_MONITORING)) ||
transactionWorkerInList(degradationInfo.disconnectedServers,
/*skipSatellite=*/false,
/*skipRemote=*/!SERVER_KNOBS->CC_ENABLE_REMOTE_LOG_ROUTER_MONITORING);
/*skipRemoteTLog=*/!SERVER_KNOBS->CC_ENABLE_REMOTE_TLOG_DISCONNECT_MONITORING,
/*skipRemoteLogRouter*/ !SERVER_KNOBS->CC_ENABLE_REMOTE_LOG_ROUTER_MONITORING);
}

bool ClusterControllerData::remoteTransactionSystemContainsDegradedServers() {
Expand Down Expand Up @@ -914,6 +936,14 @@ ACTOR Future<Void> workerAvailabilityWatch(WorkerInterface worker,
.detail("Address", worker.address());
cluster->removedDBInfoEndpoints.insert(worker.updateServerDBInfo.getEndpoint());
cluster->id_worker.erase(worker.locality.processId());
// Currently, only CC_ONLY_CONSIDER_INTRA_DC_LATENCY feature relies on addr_locality mapping. In the
// future, if needed, we can populate the mapping unconditionally.
if (SERVER_KNOBS->CC_ONLY_CONSIDER_INTRA_DC_LATENCY) {
cluster->addr_locality.erase(worker.address());
if (worker.secondaryAddress().present()) {
cluster->addr_locality.erase(worker.secondaryAddress().get());
}
}
cluster->updateWorkerList.set(worker.locality.processId(), Optional<ProcessData>());
return Void();
}
Expand Down Expand Up @@ -1275,6 +1305,23 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
req.degraded,
req.recoveredDiskFiles,
req.issues);
// Currently, only CC_ONLY_CONSIDER_INTRA_DC_LATENCY feature relies on addr_locality mapping. In the future, if
// needed, we can populate the mapping unconditionally.
if (SERVER_KNOBS->CC_ONLY_CONSIDER_INTRA_DC_LATENCY) {
const bool addrDcChanged = self->addr_locality.contains(w.address()) &&
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separately, understand why we can not relax this condition.

self->addr_locality[w.address()].dcId() != w.locality.dcId();
if (addrDcChanged) {
TraceEvent(SevWarn, "AddrDcChanged")
.detail("Addr", w.address())
.detail("ExistingLocality", self->addr_locality[w.address()].toString())
.detail("NewLocality", w.locality.toString());
}
ASSERT_WE_THINK(!addrDcChanged);
self->addr_locality[w.address()] = w.locality;
if (w.secondaryAddress().present()) {
self->addr_locality[w.secondaryAddress().get()] = w.locality;
}
}
if (!self->masterProcessId.present() &&
w.locality.processId() == self->db.serverInfo->get().master.locality.processId()) {
self->masterProcessId = w.locality.processId();
Expand Down Expand Up @@ -3363,6 +3410,15 @@ ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> connRec

namespace {

void addProcessesToSameDC(ClusterControllerData& self, const std::vector<NetworkAddress>&& processes) {
LocalityData locality;
locality.set(LocalityData::keyDcId, StringRef("1"));
for (const auto& process : processes) {
const bool added = self.addr_locality.insert({ process, locality }).second;
ASSERT(added);
}
}

// Tests `ClusterControllerData::updateWorkerHealth()` can update `ClusterControllerData::workerHealth`
// based on `UpdateWorkerHealth` request correctly.
TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
Expand Down Expand Up @@ -3536,6 +3592,10 @@ TEST_CASE("/fdbserver/clustercontroller/getDegradationInfo") {
NetworkAddress badPeer3(IPAddress(0x04040404), 1);
NetworkAddress badPeer4(IPAddress(0x05050505), 1);

if (SERVER_KNOBS->CC_ONLY_CONSIDER_INTRA_DC_LATENCY) {
addProcessesToSameDC(data, { worker, badPeer1, badPeer2, badPeer3, badPeer4 });
}

// Test that a reported degraded link should stay for sometime before being considered as a degraded
// link by cluster controller.
{
Expand Down Expand Up @@ -3797,22 +3857,32 @@ TEST_CASE("/fdbserver/clustercontroller/shouldTriggerRecoveryDueToDegradedServer
data.degradationInfo.disconnectedServers.clear();

// No recovery when remote tlog is degraded.
data.degradationInfo.degradedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
data.degradationInfo.disconnectedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
if (!(SERVER_KNOBS->CC_ONLY_CONSIDER_INTRA_DC_LATENCY &&
SERVER_KNOBS->CC_ENABLE_REMOTE_TLOG_DEGRADATION_MONITORING)) {
data.degradationInfo.degradedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
}
if (!SERVER_KNOBS->CC_ENABLE_REMOTE_TLOG_DISCONNECT_MONITORING) {
data.degradationInfo.disconnectedServers.insert(remoteTlog);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
}

// No recovery when remote log router is degraded.
data.degradationInfo.degradedServers.insert(logRouter);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
if (!(SERVER_KNOBS->CC_ONLY_CONSIDER_INTRA_DC_LATENCY &&
SERVER_KNOBS->CC_ENABLE_REMOTE_LOG_ROUTER_DEGRADATION_MONITORING)) {
data.degradationInfo.degradedServers.insert(logRouter);
ASSERT(!data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.degradedServers.clear();
}

// Trigger recovery when remote log router is disconnected.
data.degradationInfo.disconnectedServers.insert(logRouter);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
if (SERVER_KNOBS->CC_ENABLE_REMOTE_LOG_ROUTER_MONITORING) {
data.degradationInfo.disconnectedServers.insert(logRouter);
ASSERT(data.shouldTriggerRecoveryDueToDegradedServers());
data.degradationInfo.disconnectedServers.clear();
}

// No recovery when backup worker is degraded.
data.degradationInfo.degradedServers.insert(backup);
Expand Down
53 changes: 44 additions & 9 deletions fdbserver/SimulatedCluster.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ class TestConfig : public BasicTestConfig {
int simulationNormalRunTestsTimeoutSeconds = 5400;
int simulationBuggifyRunTestsTimeoutSeconds = 36000;

// Number of tlogs in the remote region
Optional<int> remoteDesiredTLogCount;
// Number of process classes explictly set as Stateless in all DCs
Optional<int> statelessProcessClassesPerDC;

ConfigDBType getConfigDBType() const { return configDBType; }

bool tomlKeyPresent(const toml::value& data, std::string key) {
Expand Down Expand Up @@ -555,6 +560,7 @@ class TestConfig : public BasicTestConfig {
.add("generateFearless", &generateFearless)
.add("datacenters", &datacenters)
.add("desiredTLogCount", &desiredTLogCount)
.add("remoteDesiredTLogCount", &remoteDesiredTLogCount)
.add("commitProxyCount", &commitProxyCount)
.add("grvProxyCount", &grvProxyCount)
.add("resolverCount", &resolverCount)
Expand All @@ -581,7 +587,8 @@ class TestConfig : public BasicTestConfig {
.add("defaultTenant", &defaultTenant)
.add("longRunningTest", &longRunningTest)
.add("simulationNormalRunTestsTimeoutSeconds", &simulationNormalRunTestsTimeoutSeconds)
.add("simulationBuggifyRunTestsTimeoutSeconds", &simulationBuggifyRunTestsTimeoutSeconds);
.add("simulationBuggifyRunTestsTimeoutSeconds", &simulationBuggifyRunTestsTimeoutSeconds)
.add("statelessProcessClassesPerDC", &statelessProcessClassesPerDC);
try {
auto file = toml::parse(testFile);
if (file.contains("configuration") && toml::find(file, "configuration").is_table()) {
Expand Down Expand Up @@ -1650,6 +1657,9 @@ void SimulationConfig::setSpecificConfig(const TestConfig& testConfig) {
if (testConfig.desiredTLogCount.present()) {
db.desiredTLogCount = testConfig.desiredTLogCount.get();
}
if (testConfig.remoteDesiredTLogCount.present()) {
db.remoteDesiredTLogCount = testConfig.remoteDesiredTLogCount.get();
}
if (testConfig.commitProxyCount.present()) {
db.commitProxyCount = testConfig.commitProxyCount.get();
}
Expand Down Expand Up @@ -2129,8 +2139,12 @@ void SimulationConfig::setRegions(const TestConfig& testConfig) {

if (deterministicRandom()->random01() < 0.25)
db.desiredLogRouterCount = deterministicRandom()->randomInt(1, 7);
if (deterministicRandom()->random01() < 0.25)

if (testConfig.remoteDesiredTLogCount.present()) {
db.remoteDesiredTLogCount = testConfig.remoteDesiredTLogCount.get();
} else if (deterministicRandom()->random01() < 0.25) {
db.remoteDesiredTLogCount = deterministicRandom()->randomInt(1, 7);
}

bool useNormalDCsAsSatellites =
datacenters > 4 && testConfig.minimumRegions < 2 && deterministicRandom()->random01() < 0.3;
Expand Down Expand Up @@ -2632,13 +2646,6 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
(dc < machineCount % dataCenters); // add remainder of machines to first datacenter
int possible_ss = 0;
int dcCoordinators = coordinatorCount / dataCenters + (dc < coordinatorCount % dataCenters);
printf("Datacenter %d: %d/%d machines, %d/%d coordinators\n",
dc,
machines,
machineCount,
dcCoordinators,
coordinatorCount);
ASSERT_LE(dcCoordinators, machines);

// FIXME: we hardcode some machines to specifically test storage cache and blob workers
// TODO: caching disabled for this merge
Expand All @@ -2657,9 +2664,26 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,

int totalMachines =
machines + storageCacheMachines + blobWorkerMachines + simHTTPMachines + extraStorageMachineCount;

printf("Datacenter %d: %d/%d machines, %d/%d coordinators, %d other machines\n",
dc,
machines,
machineCount,
dcCoordinators,
coordinatorCount,
totalMachines - machines);
ASSERT_LE(dcCoordinators, machines);

int useSeedForMachine = deterministicRandom()->randomInt(0, totalMachines);
Standalone<StringRef> zoneId;
Standalone<StringRef> newZoneId;

Optional<int> desiredStatelessClasses;
int actualStatelessClasses = 0;
if (testConfig.statelessProcessClassesPerDC.present()) {
desiredStatelessClasses = testConfig.statelessProcessClassesPerDC.get();
}

for (int machine = 0; machine < totalMachines; machine++) {
Standalone<StringRef> machineId(deterministicRandom()->randomUniqueID().toString());
if (machine == 0 || machineCount - dataCenters <= 4 || assignedMachines != 4 ||
Expand Down Expand Up @@ -2687,6 +2711,11 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
}
}

if (desiredStatelessClasses.present() && actualStatelessClasses < desiredStatelessClasses.get()) {
processClass = ProcessClass(ProcessClass::StatelessClass, ProcessClass::CommandLineSource);
actualStatelessClasses++;
}

// FIXME: hack to add machines specifically to test storage cache and blob workers and http server
// `machines` here is the normal (non-temporary) machines that totalMachines comprises of
int processCount = processesPerMachine;
Expand Down Expand Up @@ -2781,6 +2810,12 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
assignedMachines++;
}

if (desiredStatelessClasses.present()) {
// If this assertion fails, that measn that there were not enough machines in the DC (primary or remote)
// to match desired stateless classes
ASSERT(actualStatelessClasses == desiredStatelessClasses.get());
}

if (possible_ss - simconfig.db.desiredTSSCount / simconfig.db.usableRegions <= simconfig.db.storageTeamSize) {
gradualMigrationPossible = false;
}
Expand Down
Loading