diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java index 3fcfd120086..f72ffea73e3 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java @@ -54,11 +54,13 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Queue; import java.util.WeakHashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Consumer; +import java.util.stream.Collectors; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -353,6 +355,14 @@ private void connect( Runnable onSuccess, Consumer onFailure) { assert adminExecutor.inEventLoop(); + + if (LOG.isTraceEnabled()) { + LOG.trace( + "[{}] Control connection candidate nodes: [{}]", + logPrefix, + nodes.stream().map(Objects::toString).collect(Collectors.joining(", "))); + } + Node node = nodes.poll(); if (node == null) { onFailure.accept(AllNodesFailedException.fromErrors(errors)); @@ -368,11 +378,17 @@ private void connect( NodeStateEvent lastStateEvent = lastStateEvents.get(node); if (error != null) { if (closeWasCalled || initFuture.isCancelled()) { + LOG.trace( + "[{}] Error connecting to {} after close called", logPrefix, node); onSuccess.run(); // abort, we don't really care about the result } else { if (error instanceof AuthenticationException) { Loggers.warnWithException( - LOG, "[{}] Authentication error", logPrefix, error); + LOG, + "[{}] Authentication error connecting to {}", + logPrefix, + node, + error); } else { if (config .getDefaultProfile() @@ -399,39 +415,44 @@ private void connect( } } else if (closeWasCalled || initFuture.isCancelled()) { LOG.debug( - "[{}] New channel opened ({}) but the control connection was closed, closing it", + "[{}] New channel opened ({}) to {} but the control connection was closed, closing it", logPrefix, - channel); + channel, + node); channel.forceClose(); onSuccess.run(); } else if (lastDistanceEvent != null && lastDistanceEvent.distance == NodeDistance.IGNORED) { LOG.debug( - "[{}] New channel opened ({}) but node became ignored, " + "[{}] New channel opened ({}) to {} but node became ignored, " + "closing and trying next node", logPrefix, - channel); + channel, + node); channel.forceClose(); connect(nodes, errors, onSuccess, onFailure); } else if (lastStateEvent != null && (lastStateEvent.newState == null /*(removed)*/ || lastStateEvent.newState == NodeState.FORCED_DOWN)) { LOG.debug( - "[{}] New channel opened ({}) but node was removed or forced down, " + "[{}] New channel opened ({}) to {} but node was removed or forced down, " + "closing and trying next node", logPrefix, - channel); + channel, + node); channel.forceClose(); connect(nodes, errors, onSuccess, onFailure); } else { - LOG.debug("[{}] New channel opened {}", logPrefix, channel); + LOG.debug("[{}] New channel opened {} to {}", logPrefix, channel, node); DriverChannel previousChannel = ControlConnection.this.channel; ControlConnection.this.channel = channel; if (previousChannel != null) { // We were reconnecting: make sure previous channel gets closed (it may // still be open if reconnection was forced) LOG.debug( - "[{}] Forcefully closing previous channel {}", logPrefix, channel); + "[{}] Forcefully closing previous channel {}", + logPrefix, + previousChannel); previousChannel.forceClose(); } context.getEventBus().fire(ChannelEvent.channelOpened(node)); @@ -538,9 +559,10 @@ private void onDistanceEvent(DistanceEvent event) { && !channel.closeFuture().isDone() && event.node.getEndPoint().equals(channel.getEndPoint())) { LOG.debug( - "[{}] Control node {} became IGNORED, reconnecting to a different node", + "[{}] Control node {} with channel {} became IGNORED, reconnecting to a different node", logPrefix, - event.node); + event.node, + channel); reconnectNow(); } } @@ -553,9 +575,10 @@ private void onStateEvent(NodeStateEvent event) { && !channel.closeFuture().isDone() && event.node.getEndPoint().equals(channel.getEndPoint())) { LOG.debug( - "[{}] Control node {} was removed or forced down, reconnecting to a different node", + "[{}] Control node {} with channel {} was removed or forced down, reconnecting to a different node", logPrefix, - event.node); + event.node, + channel); reconnectNow(); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index 9e8184879ea..97a1212b075 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -55,6 +55,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntUnaryOperator; +import java.util.stream.Collectors; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,8 +169,12 @@ public void init(@NonNull Map nodes, @NonNull DistanceReporter dista // This includes state == UNKNOWN. If the node turns out to be unreachable, this will be // detected when we try to open a pool to it, it will get marked down and this will be // signaled back to this policy, which will then remove it from the live set. + LOG.debug("[{}] {} added to initial live set", logPrefix, node); liveNodes.add(node); } + if (LOG.isTraceEnabled()) { + logLiveNodesByDc(); + } } } @@ -364,6 +369,9 @@ public void onUp(@NonNull Node node) { } if (distance != NodeDistance.IGNORED && liveNodes.add(node)) { LOG.debug("[{}] {} came back UP, added to live set", logPrefix, node); + if (LOG.isTraceEnabled()) { + logLiveNodesByDc(); + } } } @@ -371,6 +379,9 @@ public void onUp(@NonNull Node node) { public void onDown(@NonNull Node node) { if (liveNodes.remove(node)) { LOG.debug("[{}] {} went DOWN, removed from live set", logPrefix, node); + if (LOG.isTraceEnabled()) { + logLiveNodesByDc(); + } } } @@ -378,6 +389,9 @@ public void onDown(@NonNull Node node) { public void onRemove(@NonNull Node node) { if (liveNodes.remove(node)) { LOG.debug("[{}] {} was removed, removed from live set", logPrefix, node); + if (LOG.isTraceEnabled()) { + logLiveNodesByDc(); + } } } @@ -422,4 +436,25 @@ protected NodeDistance computeNodeDistance(@NonNull Node node) { public void close() { // nothing to do } + + // logs current list of + private void logLiveNodesByDc() { + // check trace level enabled just in case + if (!LOG.isTraceEnabled()) { + return; + } + + LOG.trace( + "[{}] Current live nodes by dc: {{}}", + logPrefix, + liveNodes.dcs().stream() + .map( + dc -> + dc + + ": " + + liveNodes.dc(dc).stream() + .map(Objects::toString) + .collect(Collectors.joining(", "))) + .collect(Collectors.joining(", "))); + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefresh.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefresh.java index e8806b7651a..7ad33c03312 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefresh.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefresh.java @@ -23,10 +23,14 @@ import java.util.Map; import java.util.UUID; import net.jcip.annotations.ThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @ThreadSafe public class AddNodeRefresh extends NodesRefresh { + private static final Logger LOG = LoggerFactory.getLogger(AddNodeRefresh.class); + @VisibleForTesting final NodeInfo newNodeInfo; AddNodeRefresh(NodeInfo newNodeInfo) { @@ -36,6 +40,8 @@ public class AddNodeRefresh extends NodesRefresh { @Override public Result compute( DefaultMetadata oldMetadata, boolean tokenMapEnabled, InternalDriverContext context) { + String logPrefix = context.getSessionName(); + Map oldNodes = oldMetadata.getNodes(); Node existing = oldNodes.get(newNodeInfo.getHostId()); if (existing == null) { @@ -46,6 +52,15 @@ public Result compute( .putAll(oldNodes) .put(newNode.getHostId(), newNode) .build(); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "[{}] Adding new node {} with system-table address {}", + logPrefix, + newNode, + newNodeInfo.getBroadcastRpcAddress().orElse(null)); + } + return new Result( oldMetadata.withNodes(newNodes, tokenMapEnabled, false, null, context), ImmutableList.of(NodeStateEvent.added(newNode))); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNode.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNode.java index 839a4a61231..ef850d9efbb 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNode.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNode.java @@ -186,7 +186,8 @@ public NodeMetricUpdater getMetricUpdater() { public String toString() { // Include the hash code because this class uses reference equality return String.format( - "Node(endPoint=%s, hostId=%s, hashCode=%x)", getEndPoint(), getHostId(), hashCode()); + "Node(endPoint=%s, hostId=%s, hashCode=%x, dc=%s)", + getEndPoint(), getHostId(), hashCode(), getDatacenter()); } /** Note: deliberately not exposed by the public interface. */ diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeInfo.java index 7b34a33856d..293e42d81ea 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeInfo.java @@ -127,6 +127,40 @@ public UUID getSchemaVersion() { return schemaVersion; } + @Override + public String toString() { + return "DefaultNodeInfo{" + + "endPoint=" + + endPoint + + ", broadcastRpcAddress=" + + broadcastRpcAddress + + ", broadcastAddress=" + + broadcastAddress + + ", listenAddress=" + + listenAddress + + ", datacenter='" + + datacenter + + '\'' + + ", rack='" + + rack + + '\'' + + ", cassandraVersion='" + + cassandraVersion + + '\'' + + ", partitioner='" + + partitioner + + '\'' + + ", tokens=" + + tokens + + ", extras=" + + extras + + ", hostId=" + + hostId + + ", schemaVersion=" + + schemaVersion + + '}'; + } + @NotThreadSafe public static class Builder { private EndPoint endPoint; diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java index da5fc2115eb..d53d1b10c77 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java @@ -51,6 +51,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -215,6 +216,14 @@ public CompletionStage> refreshNodeList() { } } } + + if (LOG.isTraceEnabled()) { + LOG.trace( + "[{}] Full system-table node list: [{}]", + logPrefix, + nodeInfos.stream().map(Objects::toString).collect(Collectors.joining(", "))); + } + return nodeInfos; }); } @@ -276,8 +285,16 @@ private Optional firstPeerRowAsNodeInfo(AdminResult result, EndPoint l if (isPeerValid(row)) { return Optional.ofNullable(getBroadcastRpcAddress(row, localEndPoint)) .map( - broadcastRpcAddress -> - nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build()); + broadcastRpcAddress -> { + DefaultNodeInfo nodeInfo = + nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build(); + + if (LOG.isTraceEnabled()) { + LOG.trace("[{}] System-table node entry: {}", logPrefix, nodeInfo); + } + + return nodeInfo; + }); } } return Optional.empty(); @@ -440,7 +457,11 @@ private Optional findInPeers( if (broadcastRpcAddress != null && broadcastRpcAddress.equals(broadcastRpcAddressToFind) && isPeerValid(row)) { - return Optional.of(nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build()); + DefaultNodeInfo nodeInfo = nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build(); + if (LOG.isTraceEnabled()) { + LOG.trace("[{}] System-table node entry: {}", logPrefix, nodeInfo); + } + return Optional.of(nodeInfo); } } LOG.debug("[{}] Could not find any peer row matching {}", logPrefix, broadcastRpcAddressToFind); @@ -457,8 +478,14 @@ private Optional findInPeers( if (hostId != null && hostId.equals(hostIdToFind) && isPeerValid(row)) { return Optional.ofNullable(getBroadcastRpcAddress(row, localEndPoint)) .map( - broadcastRpcAddress -> - nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build()); + broadcastRpcAddress -> { + DefaultNodeInfo nodeInfo = + nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build(); + if (LOG.isTraceEnabled()) { + LOG.trace("[{}] System-table node entry: {}", logPrefix, nodeInfo); + } + return nodeInfo; + }); } } LOG.debug("[{}] Could not find any peer row matching {}", logPrefix, hostIdToFind); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/FullNodeListRefresh.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/FullNodeListRefresh.java index 14496bb3399..ab320dc0ff7 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/FullNodeListRefresh.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/FullNodeListRefresh.java @@ -27,8 +27,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +75,11 @@ public Result compute( DefaultNode node = (DefaultNode) oldNodes.get(id); if (node == null) { node = new DefaultNode(nodeInfo.getEndPoint(), context); - LOG.debug("[{}] Adding new node {}", logPrefix, node); + LOG.debug( + "[{}] Adding new node {} with system-table address {}", + logPrefix, + node, + nodeInfo.getBroadcastRpcAddress().orElse(null)); added.put(id, node); } if (tokenFactory == null && nodeInfo.getPartitioner() != null) { @@ -85,6 +91,18 @@ public Result compute( Set removed = Sets.difference(oldNodes.keySet(), seen); + if (!removed.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace( + "[{}] Removing nodes: [{}]", + logPrefix, + removed.stream() + .map(oldNodes::get) + .map(Objects::toString) + .collect(Collectors.joining(", "))); + } + } + if (added.isEmpty() && removed.isEmpty()) { // The list didn't change if (!oldMetadata.getTokenMap().isPresent() && tokenFactory != null) { // First time we found out what the partitioner is => set the token factory and trigger a diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java index e676d9eb2ee..12b76f45c7b 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java @@ -75,9 +75,17 @@ public Result compute( DefaultNode node = findIn(contactPoints, endPoint); if (node == null) { node = new DefaultNode(endPoint, context); - LOG.debug("[{}] Adding new node {}", logPrefix, node); + LOG.debug( + "[{}] Adding new node {} with system-table address {}", + logPrefix, + node, + nodeInfo.getBroadcastRpcAddress().orElse(null)); } else { - LOG.debug("[{}] Copying contact point {}", logPrefix, node); + LOG.debug( + "[{}] Copying contact point {} with system-table address {}", + logPrefix, + node, + nodeInfo.getBroadcastRpcAddress().orElse(null)); } if (tokenMapEnabled && tokenFactory == null && nodeInfo.getPartitioner() != null) { tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner()); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeTest.java index bc9a1b86e2f..7dd8153cd09 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultNodeTest.java @@ -28,16 +28,18 @@ public class DefaultNodeTest { private final String uuidStr = "1e4687e6-f94e-432e-a792-216f89ef265f"; private final UUID hostId = UUID.fromString(uuidStr); private final EndPoint endPoint = new DefaultEndPoint(new InetSocketAddress("localhost", 9042)); + private final String dc = "earth-1"; @Test public void should_have_expected_string_representation() { DefaultNode node = new DefaultNode(endPoint, MockedDriverContextFactory.defaultDriverContext()); node.hostId = hostId; + node.datacenter = dc; String expected = String.format( - "Node(endPoint=localhost/127.0.0.1:9042, hostId=1e4687e6-f94e-432e-a792-216f89ef265f, hashCode=%x)", + "Node(endPoint=localhost/127.0.0.1:9042, hostId=1e4687e6-f94e-432e-a792-216f89ef265f, hashCode=%x, dc=earth-1)", node.hashCode()); assertThat(node.toString()).isEqualTo(expected); } @@ -47,10 +49,11 @@ public void should_have_expected_string_representation_if_hostid_is_null() { DefaultNode node = new DefaultNode(endPoint, MockedDriverContextFactory.defaultDriverContext()); node.hostId = null; + node.datacenter = null; String expected = String.format( - "Node(endPoint=localhost/127.0.0.1:9042, hostId=null, hashCode=%x)", node.hashCode()); + "Node(endPoint=localhost/127.0.0.1:9042, hostId=null, hashCode=%x, dc=null)", node.hashCode()); assertThat(node.toString()).isEqualTo(expected); } }