Skip to content

Commit

Permalink
Fix cassandra discovery for k8s
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent Royer committed Aug 8, 2018
1 parent 0964f68 commit 4eac694
Showing 1 changed file with 28 additions and 15 deletions.
43 changes: 28 additions & 15 deletions core/src/main/java/org/elassandra/discovery/CassandraDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ protected void doStart() {
for(InetAddress endpoint : StorageService.instance.getTokenMetadata().getAllEndpoints()) {
if (!this.localAddress.equals(endpoint) && this.localDc.equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint))) {
String hostId = StorageService.instance.getHostId(endpoint).toString();
UntypedResultSet.Row row = executeInternal("SELECT preferred_ip, rpc_address from system." + SystemKeyspace.PEERS+" WHERE peer = ?", endpoint).one();
if (row != null) {
UntypedResultSet rs = executeInternal("SELECT preferred_ip, rpc_address from system." + SystemKeyspace.PEERS+" WHERE peer = ?", endpoint);
if (!rs.isEmpty()) {
UntypedResultSet.Row row = rs.one();
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
clusterGroup.update(hostId,
endpoint,
Expand Down Expand Up @@ -656,7 +657,7 @@ public void onRemove(InetAddress endpoint) {
}
} else if (isMember(endpoint)) {
DiscoveryNode removedNode = this.nodes().findByInetAddress(endpoint);
if (removedNode != null) {
if (removedNode != null && !this.localNode().getId().equals(removedNode.getId())) {
logger.warn("Removing node ip={} node={} => disconnecting", endpoint, removedNode);
if (this.metaDataVersionAckListener.get() != null) {
notifyMetaDataVersionAckListener(Gossiper.instance.getEndpointStateForEndpoint(endpoint));
Expand Down Expand Up @@ -824,18 +825,30 @@ public synchronized boolean update(String hostId, InetAddress endpoint, InetAddr
}
*/
return true;
} else if (!dn.getName().equals(buildNodeName(endpoint)) || !dn.getInetAddress().equals(Boolean.getBoolean("es.use_internal_address") ? internalIp : rpcAddress)) {
DiscoveryNode dn2 = new DiscoveryNode(buildNodeName(endpoint),
hostId,
new InetSocketTransportAddress(Boolean.getBoolean("es.use_internal_address") ? internalIp : rpcAddress, publishPort()),
dn.getAttributes(),
CASSANDRA_ROLES,
Version.CURRENT,
status);
members.replace(hostId, dn, dn2);
logger.debug("Update node host_id={} endpoint={} internal_ip={}, rpc_address={}, status={}",
hostId, NetworkAddress.format(endpoint), NetworkAddress.format(internalIp), NetworkAddress.format(rpcAddress), status);
return true;
}
if (localNode().getId().equals(hostId)) {
// ignore GOSSIP update related to our self node.
logger.debug("Ignoring GOSSIP update for node id={} ip={} because it's mine", hostId, endpoint);
return false;
}
if (!dn.getName().equals(buildNodeName(endpoint)) || !dn.getInetAddress().equals(Boolean.getBoolean("es.use_internal_address") ? internalIp : rpcAddress)) {
if (status.equals(DiscoveryNodeStatus.ALIVE)) {
DiscoveryNode dn2 = new DiscoveryNode(buildNodeName(endpoint),
hostId,
new InetSocketTransportAddress(Boolean.getBoolean("es.use_internal_address") ? internalIp : rpcAddress, publishPort()),
dn.getAttributes(),
CASSANDRA_ROLES,
Version.CURRENT,
status);
members.replace(hostId, dn, dn2);
logger.debug("Update node host_id={} endpoint={} internal_ip={}, rpc_address={}, status={}",
hostId, NetworkAddress.format(endpoint), NetworkAddress.format(internalIp), NetworkAddress.format(rpcAddress), status);
return true;
} else {
logger.debug("Ignoring node host_id={} endpoint={} internal_ip={}, rpc_address={}, status={}",
hostId, NetworkAddress.format(endpoint), NetworkAddress.format(internalIp), NetworkAddress.format(rpcAddress), status);
return false;
}
} else if (!dn.getStatus().equals(status)) {
dn.status(status);
logger.debug("Update node host_id={} endpoint={} internal_ip={} rpc_address={}, status={}",
Expand Down

0 comments on commit 4eac694

Please sign in to comment.