diff --git a/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/MessagesFloodStageTestES7IT.java b/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/MessagesFloodStageTestES7IT.java new file mode 100644 index 000000000000..18c6fcb8a24d --- /dev/null +++ b/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/MessagesFloodStageTestES7IT.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog.storage.elasticsearch7; + +import org.graylog.storage.elasticsearch7.testing.ElasticsearchInstanceES7; +import org.graylog.testing.elasticsearch.SearchServerInstance; +import org.graylog2.indexer.messages.MessagesFloodStageTestIT; +import org.junit.Rule; + +public class MessagesFloodStageTestES7IT extends MessagesFloodStageTestIT { + @Rule + public final ElasticsearchInstanceES7 elasticsearch = ElasticsearchInstanceES7.create(); + + @Override + protected SearchServerInstance searchServer() { + return this.elasticsearch; + } +} diff --git a/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/testing/ClientES7.java b/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/testing/ClientES7.java index 1bcd8824bad4..3643c24e2b70 100644 --- a/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/testing/ClientES7.java +++ b/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/testing/ClientES7.java @@ -24,6 +24,8 @@ import com.github.rholder.retry.WaitStrategies; import com.google.common.collect.Streams; import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsRequest; +import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsResponse; import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions; @@ -248,6 +250,14 @@ private String[] existingIndices() { .toArray(String[]::new); } + + public String getSetting(String setting) { + final ClusterGetSettingsRequest req = new ClusterGetSettingsRequest(); + final ClusterGetSettingsResponse response = client.execute((c, requestOptions) -> c.cluster().getSettings(req, requestOptions), + "Unable to read OS cluster setting: " + setting); + return response.getSetting(setting); + } + @Override public void putSetting(String setting, String value) { final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); @@ -274,6 +284,11 @@ public void resetIndexBlock(String index) { "Unable to reset index block for " + index); } + @Override + public void resetClusterBlock() { + // noop for ES7, needed for OS 2.x + } + @Override public void setIndexBlock(String index) { final UpdateSettingsRequest request = new UpdateSettingsRequest(index) diff --git a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesFloodStageTestOS2IT.java b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesFloodStageTestOS2IT.java new file mode 100644 index 000000000000..78559d94eda4 --- /dev/null +++ b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesFloodStageTestOS2IT.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog.storage.opensearch2; + +import org.graylog.storage.opensearch2.testing.OpenSearchInstance; +import org.graylog.testing.elasticsearch.SearchServerInstance; +import org.graylog2.indexer.messages.MessagesFloodStageTestIT; +import org.junit.Rule; + +public class MessagesFloodStageTestOS2IT extends MessagesFloodStageTestIT { + @Rule + public final OpenSearchInstance openSearchInstance = OpenSearchInstance.create(); + + @Override + protected SearchServerInstance searchServer() { + return this.openSearchInstance; + } +} diff --git a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesOS2IT.java b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesOS2IT.java index 2fb8114d9762..40f5a4b5bb73 100644 --- a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesOS2IT.java +++ b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesOS2IT.java @@ -16,17 +16,12 @@ */ package org.graylog.storage.opensearch2; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.refresh.RefreshRequest; import org.graylog.shaded.opensearch2.org.opensearch.action.index.IndexRequest; import org.graylog.shaded.opensearch2.org.opensearch.action.index.IndexResponse; -import org.graylog.shaded.opensearch2.org.opensearch.client.core.CountRequest; -import org.graylog.shaded.opensearch2.org.opensearch.client.core.CountResponse; import org.graylog.shaded.opensearch2.org.opensearch.rest.RestStatus; import org.graylog.storage.opensearch2.testing.OpenSearchInstance; import org.graylog.testing.elasticsearch.SearchServerInstance; import org.graylog2.indexer.messages.MessagesIT; -import org.graylog2.shared.bindings.providers.ObjectMapperProvider; import org.junit.Rule; import java.util.Map; diff --git a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/ClientOS2.java b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/ClientOS2.java index f73e13433384..1159c9def6bc 100644 --- a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/ClientOS2.java +++ b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/ClientOS2.java @@ -24,6 +24,8 @@ import com.github.rholder.retry.WaitStrategies; import com.google.common.collect.Streams; import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest; +import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse; import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions; @@ -221,6 +223,7 @@ public void cleanUp() { deleteIndices(existingIndices()); deleteTemplates(existingTemplates()); refreshNode(); + resetClusterBlock(); } private String[] existingTemplates() { @@ -252,7 +255,31 @@ private String[] existingIndices() { public void putSetting(String setting, String value) { final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); - request.persistentSettings(Settings.builder().put(setting, value)); + if(value == null) { + request.transientSettings(Settings.builder().putNull(setting)); + } else { + request.transientSettings(Settings.builder().put(setting, value)); + } + + client.execute((c, requestOptions) -> c.cluster().putSettings(request, requestOptions), + "Unable to update OS cluster setting: " + setting + "=" + value); + } + + public String getSetting(String setting) { + final ClusterGetSettingsRequest req = new ClusterGetSettingsRequest(); + final ClusterGetSettingsResponse response = client.execute((c, requestOptions) -> c.cluster().getSettings(req, requestOptions), + "Unable to read OS cluster setting: " + setting); + return response.getSetting(setting); + } + + public void putPersistentSetting(String setting, String value) { + final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); + + if(value == null) { + request.persistentSettings(Settings.builder().putNull(setting)); + } else { + request.persistentSettings(Settings.builder().put(setting, value)); + } client.execute((c, requestOptions) -> c.cluster().putSettings(request, requestOptions), "Unable to update OS cluster setting: " + setting + "=" + value); @@ -274,6 +301,17 @@ public void resetIndexBlock(String index) { "Unable to reset index block for " + index); } + @Override + public void resetClusterBlock() { + final String block = getSetting("cluster.blocks.create_index"); + if(Boolean.parseBoolean(block)) { + // high memory usage in previous tests may cause a cluster block. If that happens, we should reset this block before the next test. + LOG.info("Indexer cluster is blocked after heavy memory/disk usage, resetting the block"); + // reset create_index block for OpenSearch 2.x see https://github.com/opensearch-project/OpenSearch/pull/5852 + putPersistentSetting("cluster.blocks.create_index", null); + } + } + @Override public void setIndexBlock(String index) { final UpdateSettingsRequest request = new UpdateSettingsRequest(index) diff --git a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/OpenSearchInstance.java b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/OpenSearchInstance.java index 92a2649d3561..0459c8c1e739 100644 --- a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/OpenSearchInstance.java +++ b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/OpenSearchInstance.java @@ -46,7 +46,7 @@ public class OpenSearchInstance extends TestableSearchServerInstance { private static final Logger LOG = LoggerFactory.getLogger(OpenSearchInstance.class); - public static final String DEFAULT_HEAP_SIZE = "2g"; + public static final String DEFAULT_HEAP_SIZE = "5g"; public static final SearchServer OPENSEARCH_VERSION = SearchServer.DEFAULT_OPENSEARCH_VERSION; private final OpenSearchClient openSearchClient; diff --git a/graylog2-server/src/test/java/org/graylog/testing/containermatrix/SearchServer.java b/graylog2-server/src/test/java/org/graylog/testing/containermatrix/SearchServer.java index 7e004e7dd4d0..d24cb2e2bdbe 100644 --- a/graylog2-server/src/test/java/org/graylog/testing/containermatrix/SearchServer.java +++ b/graylog2-server/src/test/java/org/graylog/testing/containermatrix/SearchServer.java @@ -26,7 +26,7 @@ public enum SearchServer { ES7(ELASTICSEARCH, "7.10.2"), OS1(OPENSEARCH, "1.3.1"), OS2(OPENSEARCH, "2.0.1"), - OS2_LATEST(OPENSEARCH, "2.4.0"), + OS2_LATEST(OPENSEARCH, "2.8.0"), DATANODE_DEV(DATANODE, "5.2.0"); public static final SearchServer DEFAULT_VERSION = DATANODE_DEV; diff --git a/graylog2-server/src/test/java/org/graylog/testing/elasticsearch/Client.java b/graylog2-server/src/test/java/org/graylog/testing/elasticsearch/Client.java index 338773ebad08..a5e78be8946c 100644 --- a/graylog2-server/src/test/java/org/graylog/testing/elasticsearch/Client.java +++ b/graylog2-server/src/test/java/org/graylog/testing/elasticsearch/Client.java @@ -61,11 +61,14 @@ default String createRandomIndex(String prefix) { String fieldType(String testIndexName, String source); void putSetting(String setting, String value); + String getSetting(String setting); void waitForIndexBlock(String index); void resetIndexBlock(String index); + void resetClusterBlock(); + void setIndexBlock(String index); void updateMapping(String index, Map mapping); diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesFloodStageTestIT.java b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesFloodStageTestIT.java new file mode 100644 index 000000000000..eca6b748f56f --- /dev/null +++ b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesFloodStageTestIT.java @@ -0,0 +1,157 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.indexer.messages; + +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.graylog.failure.FailureSubmissionService; +import org.graylog.testing.elasticsearch.ElasticsearchBaseTest; +import org.graylog2.indexer.IndexSet; +import org.graylog2.plugin.Message; +import org.graylog2.system.processing.ProcessingStatusRecorder; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +public abstract class MessagesFloodStageTestIT extends ElasticsearchBaseTest { + private static final String INDEX_NAME = "messages_it_deflector"; + + protected Messages messages; + + protected static final IndexSet indexSet = new MessagesTestIndexSet(); + + protected MessagesAdapter createMessagesAdapter() { + return searchServer().adapters().messagesAdapter(); + } + + private final FailureSubmissionService failureSubmissionService = mock(FailureSubmissionService.class); + + @Before + public void setUp() throws Exception { + client().deleteIndices(INDEX_NAME); + client().createIndex(INDEX_NAME); + client().waitForGreenStatus(INDEX_NAME); + messages = new Messages(mock(TrafficAccounting.class), createMessagesAdapter(), mock(ProcessingStatusRecorder.class), + failureSubmissionService); + } + + @After + public void tearDown() { + client().resetClusterBlock(); + client().cleanUp(); + } + + protected long messageCount(String indexName) { + searchServer().client().refreshNode(); + return searchServer().adapters().countsAdapter().totalCount(Collections.singletonList(indexName)); + } + + + @Test + public void retryIndexingMessagesDuringFloodStage() throws Exception { + triggerFloodStage(INDEX_NAME); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicBoolean succeeded = new AtomicBoolean(false); + final List> messageBatch = createMessageBatch(1024, 50); + + final Future> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded))); + + countDownLatch.await(); + + resetFloodStage(INDEX_NAME); + + final List failedItems = result.get(3, TimeUnit.MINUTES); + assertThat(failedItems).isEmpty(); + + client().refreshNode(); + + assertThat(messageCount(INDEX_NAME)).isEqualTo(50); + assertThat(succeeded.get()).isTrue(); + } + + private Messages.IndexingListener createIndexingListener(CountDownLatch retryLatch, AtomicBoolean successionFlag) { + return new Messages.IndexingListener() { + @Override + public void onRetry(long attemptNumber) { + retryLatch.countDown(); + } + + @Override + public void onSuccess(long delaySinceFirstAttempt) { + if (retryLatch.getCount() > 0) { + retryLatch.countDown(); + } + successionFlag.set(true); + } + }; + } + + private Future> background(Callable> task) { + final ExecutorService executor = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setNameFormat("messages-it-%d").build()); + + return executor.submit(task); + } + + private void triggerFloodStage(String index) { + client().putSetting("cluster.routing.allocation.disk.watermark.low", "0%"); + client().putSetting("cluster.routing.allocation.disk.watermark.high", "0%"); + client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "0%"); + + client().waitForIndexBlock(index); + } + + private void resetFloodStage(String index) { + client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "95%"); + client().putSetting("cluster.routing.allocation.disk.watermark.high", "90%"); + client().putSetting("cluster.routing.allocation.disk.watermark.low", "85%"); + + client().resetIndexBlock(index); + client().resetClusterBlock(); + } + + private DateTime now() { + return DateTime.now(DateTimeZone.UTC); + } + + private ArrayList> createMessageBatch(int size, int count) { + final ArrayList> messageList = new ArrayList<>(); + + final String message = "A".repeat(size); + for (int i = 0; i < count; i++) { + messageList.add(Maps.immutableEntry(indexSet, new Message(i + message, "source", now()))); + } + return messageList; + } +} diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesIT.java b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesIT.java index 9723e20d8307..742377ee7284 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesIT.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesIT.java @@ -69,6 +69,7 @@ protected MessagesAdapter createMessagesAdapter() { @Before public void setUp() throws Exception { + client().resetClusterBlock(); // the index may be blocked with delay from a previous test, if yes then reset it. client().deleteIndices(INDEX_NAME); client().createIndex(INDEX_NAME); client().waitForGreenStatus(INDEX_NAME); @@ -168,28 +169,6 @@ public void conflictingFieldTypesErrorAreReported() throws Exception { verify(failureSubmissionService).submitIndexingErrors(argThat(arg -> arg.size() == 1)); } - @Test - public void retryIndexingMessagesDuringFloodStage() throws Exception { - triggerFloodStage(INDEX_NAME); - final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicBoolean succeeded = new AtomicBoolean(false); - final List> messageBatch = createMessageBatch(1024, 50); - - final Future> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded))); - - countDownLatch.await(); - - resetFloodStage(INDEX_NAME); - - final List failedItems = result.get(3, TimeUnit.MINUTES); - assertThat(failedItems).isEmpty(); - - client().refreshNode(); - - assertThat(messageCount(INDEX_NAME)).isEqualTo(50); - assertThat(succeeded.get()).isTrue(); - } - private Messages.IndexingListener createIndexingListener(CountDownLatch retryLatch, AtomicBoolean successionFlag) { return new Messages.IndexingListener() { @Override @@ -261,22 +240,6 @@ private Future> background(Callable> task) { return executor.submit(task); } - private void triggerFloodStage(String index) { - client().putSetting("cluster.routing.allocation.disk.watermark.low", "0%"); - client().putSetting("cluster.routing.allocation.disk.watermark.high", "0%"); - client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "0%"); - - client().waitForIndexBlock(index); - } - - private void resetFloodStage(String index) { - client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "95%"); - client().putSetting("cluster.routing.allocation.disk.watermark.high", "90%"); - client().putSetting("cluster.routing.allocation.disk.watermark.low", "85%"); - - client().resetIndexBlock(index); - } - private Map.Entry entry(IndexSet indexSet, Message message) { return new AbstractMap.SimpleEntry<>(indexSet, message); }