Skip to content

Commit

Permalink
[Fix][Zeta] If Zeta not a TCP discovery, it cannot find other members (
Browse files Browse the repository at this point in the history
  • Loading branch information
loustler authored Sep 28, 2024
1 parent 156b087 commit dff3ef4
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,18 @@ public class KubernetesIT {
private static final String podName = "seatunnel-0";

@Test
public void test()
public void testTcpDiscovery()
throws IOException, XmlPullParserException, ApiException, InterruptedException {
runDiscoveryTest("hazelcast-tcp-discovery.yaml");
}

@Test
public void testKubernetesDiscovery()
throws IOException, XmlPullParserException, ApiException, InterruptedException {
runDiscoveryTest("hazelcast-kubernetes-discovery.yaml");
}

private void runDiscoveryTest(String hazelCastConfigFile)
throws IOException, XmlPullParserException, ApiException, InterruptedException {
ApiClient client = Config.defaultClient();
AppsV1Api appsV1Api = new AppsV1Api(client);
Expand All @@ -82,7 +93,7 @@ public void test()
log.info("Docker's environmental information");
log.info(info.toString());
if (dockerClient.listImagesCmd().withImageNameFilter(tag).exec().isEmpty()) {
copyFileToCurrentResources(targetPath);
copyFileToCurrentResources(hazelCastConfigFile, targetPath);
File file =
new File(
PROJECT_ROOT_PATH
Expand Down Expand Up @@ -153,7 +164,8 @@ public void test()
}
}

private void copyFileToCurrentResources(String targetPath) throws IOException {
private void copyFileToCurrentResources(String hazelCastConfigFile, String targetPath)
throws IOException {
File jarsPath = new File(targetPath + "/jars");
jarsPath.mkdirs();
File binPath = new File(targetPath + "/bin");
Expand All @@ -164,7 +176,7 @@ private void copyFileToCurrentResources(String targetPath) throws IOException {
new File(PROJECT_ROOT_PATH + "/config"), new File(targetPath + "/config"));
// replace hazelcast.yaml and hazelcast-client.yaml
Files.copy(
Paths.get(targetPath + "/custom_config/hazelcast.yaml"),
Paths.get(targetPath + "/custom_config/" + hazelCastConfigFile),
Paths.get(targetPath + "/config/hazelcast.yaml"),
StandardCopyOption.REPLACE_EXISTING);
Files.copy(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

hazelcast:
cluster-name: seatunnel
network:
rest-api:
enabled: true
endpoint-groups:
CLUSTER_WRITE:
enabled: true
DATA:
enabled: true
join:
multicast:
enabled: false
kubernetes:
enabled: true
service-port: 5801
namespace: default
service-name: seatunnel
port:
auto-increment: true
port-count: 100
port: 5801
properties:
hazelcast.invocation.max.retry.count: 100
hazelcast.invocation.retry.pause.millis: 1000
hazelcast.tcp.join.port.try.count: 30
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true
hazelcast.logging.type: log4j2
hazelcast.operation.generic.thread.count: 200
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@
import com.hazelcast.instance.impl.Node;
import com.hazelcast.instance.impl.NodeExtension;
import com.hazelcast.internal.cluster.Joiner;
import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig;
import static com.hazelcast.internal.config.AliasedDiscoveryConfigUtils.allUsePublicAddress;
import static com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_ENABLED;
import static com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_PUBLIC_IP_ENABLED;

@Slf4j
public class SeaTunnelNodeContext extends DefaultNodeContext {
Expand All @@ -53,26 +49,11 @@ public Joiner createJoiner(Node node) {
getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();
join.verify();

if (node.shouldUseMulticastJoiner(join) && node.multicastService != null) {
super.createJoiner(node);
} else if (join.getTcpIpConfig().isEnabled()) {
if (join.getTcpIpConfig().isEnabled()) {
log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery");
return new LiteNodeDropOutTcpIpJoiner(node);
} else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED)
|| isAnyAliasedConfigEnabled(join)
|| join.isAutoDetectionEnabled()) {
super.createJoiner(node);
}
return null;
}

private static boolean isAnyAliasedConfigEnabled(JoinConfig join) {
return !AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();
}

private boolean usePublicAddress(JoinConfig join, Node node) {
return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED)
|| allUsePublicAddress(
AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
return super.createJoiner(node);
}
}

0 comments on commit dff3ef4

Please sign in to comment.