Skip to content

Commit

Permalink
pass in node name to correct shard relocation (#20463)
Browse files Browse the repository at this point in the history
handle stop in state machine
  • Loading branch information
moesterheld authored Sep 19, 2024
1 parent a51d072 commit 9c778cc
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.exec.ExecuteException;
import org.apache.http.client.utils.URIBuilder;
import org.graylog.datanode.Configuration;
import org.graylog.datanode.configuration.DatanodeConfiguration;
import org.graylog2.security.TrustManagerAggregator;
import org.graylog.datanode.configuration.variants.OpensearchSecurityConfiguration;
import org.graylog.datanode.opensearch.cli.OpensearchCommandLineProcess;
import org.graylog.datanode.opensearch.configuration.OpensearchConfiguration;
Expand Down Expand Up @@ -57,6 +57,7 @@
import org.graylog2.datanode.DataNodeLifecycleTrigger;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.security.CustomCAX509TrustManager;
import org.graylog2.security.TrustManagerAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -118,7 +119,7 @@ public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener
@Inject
OpensearchProcessImpl(DatanodeConfiguration datanodeConfiguration, final CustomCAX509TrustManager trustManager,
final Configuration configuration, final NodeService<DataNodeDto> nodeService,
ObjectMapper objectMapper, OpensearchStateMachine processState, String nodeName, NodeId nodeId, EventBus eventBus) {
ObjectMapper objectMapper, OpensearchStateMachine processState, @Named("node_name") String nodeName, NodeId nodeId, EventBus eventBus) {
this.datanodeConfiguration = datanodeConfiguration;
this.processState = processState;
this.stdout = new CircularFifoQueue<>(datanodeConfiguration.processLogsBufferSize());
Expand Down Expand Up @@ -344,7 +345,7 @@ void checkRemovalStatus() {
final ClusterHealthResponse health = clusterClient
.health(new ClusterHealthRequest(), RequestOptions.DEFAULT);
if (health.getRelocatingShards() == 0) {
stop(); // todo: fire state machine trigger instead of calling stop
onEvent(OpensearchEvent.PROCESS_STOPPED);
executorService.shutdown();
eventBus.post(DataNodeLifecycleEvent.create(nodeId.getNodeId(), DataNodeLifecycleTrigger.REMOVED));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public static OpensearchStateMachine createNew(OpensearchProcess process, Set<St
.permit(OpensearchEvent.PROCESS_STOPPED, OpensearchState.REMOVED);

config.configure(OpensearchState.REMOVED)
.onEntry(process::stop)
.permit(OpensearchEvent.RESET, OpensearchState.WAITING_FOR_CONFIGURATION, process::reset)
.ignore(OpensearchEvent.PROCESS_STOPPED);

Expand Down

0 comments on commit 9c778cc

Please sign in to comment.