diff --git a/build.gradle b/build.gradle index 92d048f..9e89fae 100644 --- a/build.gradle +++ b/build.gradle @@ -73,9 +73,10 @@ dependencies { testImplementation('org.powermock:powermock-api-mockito2:2.0.0') testImplementation('org.powermock:powermock-module-junit4:2.0.0') testImplementation('org.awaitility:awaitility:3.1.6') - + testImplementation('com.datastax.oss.simulacron:simulacron-native-server:0.8.10') + testImplementation('com.datastax.oss.simulacron:simulacron-driver-3x:0.8.10') + testImplementation('com.datastax.oss:java-driver-test-infra:4.4.0') testImplementation('org.scassandra:java-client:1.1.2') - testCompileAndFunctional('org.cassandraunit:cassandra-unit:4.2.2.0-SNAPSHOT') functional('org.slf4j:slf4j-simple:1.7.25') } diff --git a/settings.gradle b/settings.gradle index 700605d..342d6d0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -//rootProject.name = 'cqlmigrate' +rootProject.name = 'cqlmigrate' diff --git a/src/main/java/uk/sky/cqlmigrate/CassandraLockingMechanism.java b/src/main/java/uk/sky/cqlmigrate/CassandraLockingMechanism.java index 94596b5..e262a0b 100644 --- a/src/main/java/uk/sky/cqlmigrate/CassandraLockingMechanism.java +++ b/src/main/java/uk/sky/cqlmigrate/CassandraLockingMechanism.java @@ -13,8 +13,6 @@ import uk.sky.cqlmigrate.exception.CannotReleaseLockException; import static com.datastax.oss.driver.api.core.cql.SimpleStatement.newInstance; -import static com.datastax.oss.driver.api.core.cql.Statement.ASYNC; -import static com.datastax.oss.driver.api.core.cql.Statement.SYNC; class CassandraLockingMechanism extends LockingMechanism { @@ -43,9 +41,9 @@ public void init() throws CannotAcquireLockException { super.init(); try { - insertLockQuery = session.prepare(newInstance("INSERT INTO cqlmigrate.locks (name, client) VALUES (?, ?) IF NOT EXISTS",ASYNC) + insertLockQuery = session.prepare(newInstance("INSERT INTO cqlmigrate.locks (name, client) VALUES (?, ?) IF NOT EXISTS") .setConsistencyLevel(consistencyLevel)); - deleteLockQuery = session.prepare(newInstance("DELETE FROM cqlmigrate.locks WHERE name = ? IF client = ?",ASYNC) + deleteLockQuery = session.prepare(newInstance("DELETE FROM cqlmigrate.locks WHERE name = ? IF client = ?") .setConsistencyLevel(consistencyLevel)); } catch (DriverException e) { diff --git a/src/main/java/uk/sky/cqlmigrate/CqlLoader.java b/src/main/java/uk/sky/cqlmigrate/CqlLoader.java index 87eada7..edd2d25 100644 --- a/src/main/java/uk/sky/cqlmigrate/CqlLoader.java +++ b/src/main/java/uk/sky/cqlmigrate/CqlLoader.java @@ -26,7 +26,7 @@ static void load(SessionContext sessionContext, List cqlStatements) { .map(stringStatement -> SimpleStatement.newInstance(stringStatement).setConsistencyLevel(sessionContext.getWriteConsistencyLevel())) .forEach(statement -> { LOGGER.debug("Executing cql statement {}", statement); - sessionContext.getSession().execute(statement); //TODO not sure if the statements returns a Boolean + sessionContext.getSession().execute(statement); }); } catch (DriverException e) { LOGGER.error("Failed to execute cql statements {}: {}", cqlStatements, e.getMessage()); diff --git a/src/main/java/uk/sky/cqlmigrate/CqlMigratorImpl.java b/src/main/java/uk/sky/cqlmigrate/CqlMigratorImpl.java index c80a275..b3a7d5a 100644 --- a/src/main/java/uk/sky/cqlmigrate/CqlMigratorImpl.java +++ b/src/main/java/uk/sky/cqlmigrate/CqlMigratorImpl.java @@ -134,7 +134,7 @@ public void clean(String[] hosts, int port, String username, String password, St // TODO driver is taking longer than 2 secs to drop schema public void clean(Session session, String keyspace) { session.execute(SimpleStatement.newInstance("DROP KEYSPACE IF EXISTS " + keyspace) - .setTimeout(Duration.ofSeconds(4)) + .setTimeout(Duration.ofSeconds(10)) .setConsistencyLevel(cqlMigratorConfig.getWriteConsistencyLevel()), Statement.SYNC); LOGGER.info("Cleaned {}", keyspace); diff --git a/src/test/java/uk/sky/cqlmigrate/CassandraLockingMechanismSimulacron.java b/src/test/java/uk/sky/cqlmigrate/CassandraLockingMechanismSimulacron.java new file mode 100644 index 0000000..bf02f82 --- /dev/null +++ b/src/test/java/uk/sky/cqlmigrate/CassandraLockingMechanismSimulacron.java @@ -0,0 +1,329 @@ +package uk.sky.cqlmigrate; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.DriverException; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder; +import com.datastax.oss.driver.api.testinfra.session.SessionUtils; +import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule; +import com.datastax.oss.protocol.internal.request.Prepare; +import com.datastax.oss.simulacron.common.cluster.ClusterSpec; +import com.datastax.oss.simulacron.common.cluster.QueryLog; +import com.datastax.oss.simulacron.common.codec.WriteType; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.assertj.core.api.AbstractThrowableAssert; +import org.junit.*; +import uk.sky.cqlmigrate.exception.CannotAcquireLockException; +import uk.sky.cqlmigrate.exception.CannotReleaseLockException; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static com.datastax.oss.simulacron.common.stubbing.PrimeDsl.*; +import static java.lang.String.valueOf; +import static org.assertj.core.api.Assertions.*; + +public class CassandraLockingMechanismSimulacron { + + private static final String LOCK_KEYSPACE = "lock-keyspace"; + private static final String CLIENT_ID = UUID.randomUUID().toString(); + private static final String PREPARE_INSERT_QUERY = "INSERT INTO cqlmigrate.locks (name, client) VALUES (?, ?) IF NOT EXISTS"; + private static final String PREPARE_DELETE_QUERY = "DELETE FROM cqlmigrate.locks WHERE name = ? IF client = ?"; + + @ClassRule + public static final SimulacronRule SIMULACRON_RULE = new SimulacronRule(ClusterSpec.builder().withNodes(1)); + private static CqlSession session; + + CassandraLockingMechanism lockingMechanism; + + Predicate prepareQueryPredicate = i -> i.getFrame().message instanceof Prepare; + + @Before + public void baseSetup() throws Exception { + SIMULACRON_RULE.cluster().acceptConnections(); + SIMULACRON_RULE.cluster().clearLogs(); + SIMULACRON_RULE.cluster().clearPrimes(true); + session = newSession(null); + lockingMechanism = new CassandraLockingMechanism(session, LOCK_KEYSPACE, ConsistencyLevel.ALL); + } + + @After + public void baseTearDown() throws Exception { + session.close(); + } + + @Test + public void shouldPrepareInsertLocksQueryWhenInit() throws Throwable { + //when + lockingMechanism.init(); + //then + List queryLogs = SIMULACRON_RULE.cluster().getLogs().getQueryLogs().stream() + .filter(queryLog -> queryLog.getFrame().message.toString().contains(PREPARE_INSERT_QUERY)) + .filter(prepareQueryPredicate) + .collect(Collectors.toList()); + + assertThat(queryLogs.size()).isEqualTo(1); + } + + + @Test + public void shouldPrepareDeleteLocksQueryWhenInit() throws Throwable { + //when + lockingMechanism.init(); + + //then + List queryLogs = SIMULACRON_RULE.cluster().getLogs().getQueryLogs().stream() + .filter(queryLog -> queryLog.getFrame().message.toString().contains(PREPARE_DELETE_QUERY)) + .filter(prepareQueryPredicate).collect(Collectors.toList()); + assertThat(queryLogs.size()).isEqualTo(1); + } + + @Test + public void shouldInsertLockWhenAcquiringLock() throws Exception { + + SIMULACRON_RULE.cluster().prime(primeInsertQuery(LOCK_KEYSPACE, CLIENT_ID, true)); + //when + lockingMechanism.init(); + //then + assertThat(lockingMechanism.acquire(CLIENT_ID)).isTrue(); + + + } + + // +// @Test // same as one above +// public void shouldSuccessfullyAcquireLockWhenInsertIsApplied() throws Exception { +// //when +// boolean acquiredLock = lockingMechanism.acquire(CLIENT_ID); +// +// //then +// assertThat(acquiredLock) +// .describedAs("lock was acquired") +// .isTrue(); +// } +// + @Test + public void shouldUnsuccessfullyAcquireLockWhenInsertIsNotApplied() throws Exception { + //given + SIMULACRON_RULE.cluster().prime(primeInsertQuery(LOCK_KEYSPACE, CLIENT_ID, true). + then(writeTimeout(com.datastax.oss.simulacron.common.codec.ConsistencyLevel.ALL, 1, 1, WriteType.SIMPLE))); + + try (CqlSession session = newSession(null)) { + //when + lockingMechanism = new CassandraLockingMechanism(session, LOCK_KEYSPACE, ConsistencyLevel.ALL); + //when + lockingMechanism.init(); + //then + boolean acquiredLock = lockingMechanism.acquire(CLIENT_ID); + + //then + assertThat(acquiredLock) + .describedAs("lock was not acquired") + .isFalse(); + } + } + + @Test + public void shouldSuccessfullyAcquireLockWhenLockIsAlreadyAcquired() throws Exception { + //given + SIMULACRON_RULE.cluster().prime(primeInsertQuery(LOCK_KEYSPACE, CLIENT_ID, false)); + //when + lockingMechanism.init(); + //then + boolean acquiredLock = lockingMechanism.acquire(CLIENT_ID); + assertThat(acquiredLock) + .describedAs("lock was acquired") + .isTrue(); + } + + @Test + public void shouldUnsuccessfullyAcquireLockWhenWriteTimeoutOccurs() { + //given + SIMULACRON_RULE.cluster().prime(primeInsertQuery(LOCK_KEYSPACE, CLIENT_ID, true). + then(writeTimeout(com.datastax.oss.simulacron.common.codec.ConsistencyLevel.ALL, 1, 1, WriteType.SIMPLE)).ignoreOnPrepare()); + + //when + lockingMechanism.init(); + boolean acquiredLock = lockingMechanism.acquire(CLIENT_ID); + + //then + assertThat(acquiredLock) + .describedAs("lock was not acquired") + .isFalse(); + } + + @Test + public void shouldThrowExceptionIfQueryFailsToExecuteWhenAcquiringLock() throws Exception { + //given + SIMULACRON_RULE.cluster().prime(primeInsertQuery(LOCK_KEYSPACE, CLIENT_ID, true). + then(unavailable(com.datastax.oss.simulacron.common.codec.ConsistencyLevel.ALL, 1, 1))); + lockingMechanism.init(); + //when + Throwable throwable = catchThrowable(() -> lockingMechanism.acquire(CLIENT_ID)); + //then + assertThat(throwable) + .isNotNull() + .isInstanceOf(CannotAcquireLockException.class) + .hasCauseInstanceOf(DriverException.class) + .hasMessage(String.format("Query to acquire lock %s.schema_migration for client %s failed to execute", LOCK_KEYSPACE, CLIENT_ID)); + } + + @Test + public void shouldDeleteLockWhenReleasingLock() throws Exception { + //given + SIMULACRON_RULE.cluster().prime(primeDeleteQuery(LOCK_KEYSPACE, CLIENT_ID, true)); + //when + lockingMechanism.init(); + boolean isLockReleased = lockingMechanism.release(CLIENT_ID); + + //then + assertThat(isLockReleased).isTrue(); + List queryLogs = SIMULACRON_RULE.cluster().getLogs().getQueryLogs().stream() + .filter(queryLog -> queryLog.getFrame().message.toString().contains(PREPARE_DELETE_QUERY)) + .collect(Collectors.toList()); + + assertThat(queryLogs.size()).isEqualTo(1); + + } + + @Test + public void shouldSuccessfullyReleaseLockWhenNoLockFound() throws Exception { + SIMULACRON_RULE.cluster().prime(primeDeleteQuery(LOCK_KEYSPACE, CLIENT_ID, false)); + lockingMechanism.init(); + //when + AbstractThrowableAssert execution = assertThatCode( + () -> lockingMechanism.release(CLIENT_ID) + ); + // then + execution.doesNotThrowAnyException(); + } + + @Test + public void shouldThrowExceptionIfQueryFailsToExecuteWhenReleasingLock() throws Exception { + //given + SIMULACRON_RULE.cluster().prime(primeDeleteQuery(LOCK_KEYSPACE, CLIENT_ID, true). + then(unavailable(com.datastax.oss.simulacron.common.codec.ConsistencyLevel.ALL, 1, 1))); + lockingMechanism.init(); + + //when + Throwable throwable = catchThrowable(() -> lockingMechanism.release(CLIENT_ID)); + + //then + assertThat(throwable) + .isNotNull() + .isInstanceOf(CannotReleaseLockException.class) + .hasCauseInstanceOf(DriverException.class) + .hasMessage("Query failed to execute"); + } + + @Ignore("needs review") + @Test + public void shouldSuccessfullyReleaseLockWhenRetryingAfterWriteTimeOutButDoesNotHoldLockNow() { + //given + SIMULACRON_RULE.cluster().prime(primeDeleteQuery(LOCK_KEYSPACE, CLIENT_ID, true). + then(writeTimeout(com.datastax.oss.simulacron.common.codec.ConsistencyLevel.ALL, 1, 1, WriteType.SIMPLE))); + + //when + lockingMechanism.init(); + lockingMechanism.release(CLIENT_ID); + + //then prime a success + SIMULACRON_RULE.cluster().prime(primeDeleteQuery(LOCK_KEYSPACE, CLIENT_ID, false)); + + //when + boolean result = lockingMechanism.release(CLIENT_ID); + + //then + assertThat(result) + .describedAs("lock was released") + .isTrue(); + } + + @Ignore("needs review") + @Test + public void shouldThrowCannotReleaseLockExceptionWhenLockNotHeldByUs() throws InterruptedException { + //when + String newLockHolder = "new lock holder"; + SIMULACRON_RULE.cluster().prime(primeDeleteQuery(LOCK_KEYSPACE, newLockHolder, false).applyToPrepare()); + //and + PrimeBuilder primeBuilder = primeDeleteQuery(LOCK_KEYSPACE, CLIENT_ID, false); + SIMULACRON_RULE.cluster().prime(primeBuilder.then(noRows())); + lockingMechanism.init(); + + Throwable throwable = catchThrowable(() -> lockingMechanism.release(CLIENT_ID)); + + assertThat(throwable) + .isNotNull() + .isInstanceOf(CannotReleaseLockException.class) + .hasMessage(String.format("Lock %s.schema_migration attempted to be released by a non lock holder (%s). Current lock holder: %s", LOCK_KEYSPACE, CLIENT_ID, newLockHolder)); + } + + @Test + public void shouldReturnFalseIfDeleteNotAppliedButClientIsUs() { + //given + SIMULACRON_RULE.cluster().prime(primeDeleteQuery(LOCK_KEYSPACE, CLIENT_ID, false).applyToPrepare()); + lockingMechanism.init(); + //when + boolean released = lockingMechanism.release(CLIENT_ID); + + //then + assertThat(released) + .describedAs("lock was not released") + .isFalse(); + } + + private CqlSession newSession(ProgrammaticDriverConfigLoaderBuilder loaderBuilder) { + if (loaderBuilder == null) { + loaderBuilder = SessionUtils.configLoaderBuilder(); + loaderBuilder.withString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, "dc1"); + + } + DriverConfigLoader loader = + loaderBuilder + .withDuration(DefaultDriverOption.HEARTBEAT_INTERVAL, Duration.ofSeconds(1)) + .withDuration(DefaultDriverOption.HEARTBEAT_TIMEOUT, Duration.ofMillis(500)) + .withDuration(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofSeconds(2)) + .withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofSeconds(1)) + .build(); + return SessionUtils.newSession(SIMULACRON_RULE, loader); + } + + private static PrimeBuilder primeInsertQuery(String lockName, String clientId, boolean lockApplied) { + String prepareInsertQuery = "INSERT INTO cqlmigrate.locks (name, client) VALUES (?, ?) IF NOT EXISTS"; + + PrimeBuilder primeBuilder = when(query( + prepareInsertQuery, + Lists.newArrayList( + com.datastax.oss.simulacron.common.codec.ConsistencyLevel.ONE, + com.datastax.oss.simulacron.common.codec.ConsistencyLevel.ALL), + ImmutableMap.of("name", lockName + ".schema_migration", "client", clientId), + ImmutableMap.of("name", "varchar", "client", "varchar"))) + .then(rows().row( + "[applied]", valueOf(lockApplied), "client", CLIENT_ID).columnTypes("[applied]", "boolean", "clientid", "varchar") + ); + return primeBuilder; + + } + + private static PrimeBuilder primeDeleteQuery(String lockName, String clientId, boolean lockApplied) { + String deleteQuery = "DELETE FROM cqlmigrate.locks WHERE name = ? IF client = ?"; + + PrimeBuilder primeBuilder = when(query( + deleteQuery, + Lists.newArrayList( + com.datastax.oss.simulacron.common.codec.ConsistencyLevel.ONE, + com.datastax.oss.simulacron.common.codec.ConsistencyLevel.ALL), + ImmutableMap.of("name", lockName + ".schema_migration", "client", clientId), + ImmutableMap.of("name", "varchar", "client", "varchar"))) + .then(rows() + .row("[applied]", valueOf(lockApplied), "client", CLIENT_ID).columnTypes("[applied]", "boolean", "clientid", "varchar")); + return primeBuilder; + } + +} diff --git a/src/test/java/uk/sky/cqlmigrate/CassandraNoOpLockingMechanismTest.java b/src/test/java/uk/sky/cqlmigrate/CassandraNoOpLockingMechanismTest.java index 709b10f..1955194 100644 --- a/src/test/java/uk/sky/cqlmigrate/CassandraNoOpLockingMechanismTest.java +++ b/src/test/java/uk/sky/cqlmigrate/CassandraNoOpLockingMechanismTest.java @@ -16,7 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.scassandra.http.client.PrimingRequest.then; import static org.scassandra.http.client.types.ColumnMetadata.column; -@Ignore +@Ignore("have a updated version with Simulacron; can be removed after review") //TODO for 4.x.x public class CassandraNoOpLockingMechanismTest { diff --git a/src/test/java/uk/sky/cqlmigrate/CassandraNoOpLockingMechanismTestSimulacron.java b/src/test/java/uk/sky/cqlmigrate/CassandraNoOpLockingMechanismTestSimulacron.java new file mode 100644 index 0000000..b3a7e5b --- /dev/null +++ b/src/test/java/uk/sky/cqlmigrate/CassandraNoOpLockingMechanismTestSimulacron.java @@ -0,0 +1,74 @@ +package uk.sky.cqlmigrate; + +import com.datastax.oss.driver.api.core.CqlSession; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.concurrent.ExecutorService; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.verifyZeroInteractions; + +@RunWith(MockitoJUnitRunner.class) +public class CassandraNoOpLockingMechanismTestSimulacron { + + + public static final String CLIENT_ID = "CLIENT_ID"; + @Mock + private CqlSession session; + + private CassandraNoOpLockingMechanism lockingMechanism = new CassandraNoOpLockingMechanism(); + private ExecutorService executorService; + + + @Test + public void shouldPrepareNoInsertLocksQueryWhenInit() throws Throwable { + //when + lockingMechanism.init(); + + //then + verifyZeroInteractions(session); + + } + + @Test + public void nothingShouldInsertLockWhenAcquiringLock() throws Exception { + //when + boolean acquire = lockingMechanism.acquire(CLIENT_ID); + + //then + assertThat(acquire).isTrue(); + verifyZeroInteractions(session); + } + + @Test + public void shouldDeleteLockWhenReleasingLock() throws Exception { + //when + boolean release = lockingMechanism.release(CLIENT_ID); + + //then + assertThat(release).isTrue(); + verifyZeroInteractions(session); + } + + @Test + public void shouldAlwaysReleasingLock() throws Exception { + //when + boolean released = lockingMechanism.release(CLIENT_ID); + + //then + assertThat(released).isTrue(); + } + + @Test + public void shouldAlwaysAcquireLock() throws Exception { + //when + boolean acquired = lockingMechanism.acquire(CLIENT_ID); + + //then + assertThat(acquired).isTrue(); + } + +} diff --git a/src/test/java/uk/sky/cqlmigrate/ClusterHealthTest.java b/src/test/java/uk/sky/cqlmigrate/ClusterHealthTest.java index 5d327f9..4254de1 100644 --- a/src/test/java/uk/sky/cqlmigrate/ClusterHealthTest.java +++ b/src/test/java/uk/sky/cqlmigrate/ClusterHealthTest.java @@ -1,71 +1,54 @@ package uk.sky.cqlmigrate; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.simulacron.common.cluster.ClusterSpec; +import com.datastax.oss.simulacron.common.cluster.DataCenterSpec; +import com.datastax.oss.simulacron.server.BoundCluster; +import com.datastax.oss.simulacron.server.Server; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; -import org.scassandra.Scassandra; -import org.scassandra.ScassandraFactory; -import org.scassandra.http.client.ActivityClient; -import org.scassandra.http.client.PrimingClient; import uk.sky.cqlmigrate.exception.ClusterUnhealthyException; -import uk.sky.cqlmigrate.util.PortScavenger; -import java.net.InetSocketAddress; -import java.util.Collection; +import java.util.UUID; -import static java.util.Collections.singletonList; +import static com.datastax.oss.simulacron.common.stubbing.CloseType.DISCONNECT; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.catchThrowable; import static org.awaitility.Awaitility.await; -@Ignore -//TODO for 4.x.x -public class ClusterHealthTest { - - private static final int BINARY_PORT = 9042; - private static final int ADMIN_PORT = PortScavenger.getFreePort(); - private static final Collection CASSANDRA_HOSTS = singletonList("localhost"); - private static final Collection CASSANDRA_NODES = singletonList(new InetSocketAddress("localhost", BINARY_PORT)); - private static final Scassandra scassandra = ScassandraFactory.createServer("0.0.0.0",BINARY_PORT, "0.0.0.0",ADMIN_PORT); +public class ClusterHealthTest { - private static final PrimingClient primingClient = scassandra.primingClient(); - private static final ActivityClient activityClient = scassandra.activityClient(); - private CqlSession cluster; + private static CqlSession session; private ClusterHealth clusterHealth; + private static Server server = Server.builder().build(); + private ClusterSpec cluster = ClusterSpec.builder().build(); + private BoundCluster bCluster; + @Before public void setUp() { - scassandra.start(); - - String username = "cassandra"; - String password = "cassandra"; - cluster = CqlSession.builder() - .addContactPoints(CASSANDRA_NODES) - .withAuthCredentials(username, password) - .build(); - - clusterHealth = new ClusterHealth(cluster); - - primingClient.clearAllPrimes(); - activityClient.clearAllRecordedActivity(); + DataCenterSpec dc = cluster.addDataCenter().withCassandraVersion("3.8").build(); + dc.addNode().build(); + dc.addNode().withPeerInfo("host_id", UUID.randomUUID()).build(); + bCluster = server.register(cluster); + session = CqlSession.builder().withLocalDatacenter(dc.getName()).build(); + clusterHealth = new ClusterHealth(session); } @After public void tearDown() { - scassandra.stop(); - cluster.close(); + session.close(); } @Test public void shouldThrowExceptionIfHostIsDown() { //given - scassandra.stop(); + bCluster.node(0).closeConnections(DISCONNECT); await().pollInterval(500, MILLISECONDS) .atMost(5, SECONDS) @@ -76,7 +59,7 @@ public void shouldThrowExceptionIfHostIsDown() { //then assertThat(throwable).isNotNull(); assertThat(throwable).isInstanceOf(ClusterUnhealthyException.class); - assertThat(throwable).hasMessage("Cluster not healthy, the following hosts are down: [localhost/127.0.0.1]"); + assertThat(throwable).hasMessage("Cluster not healthy, the following hosts are down: [/127.0.0.1]"); throwable.printStackTrace(); }); }