diff --git a/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/MessagesFloodStageTestES7IT.java b/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/MessagesFloodStageTestES7IT.java deleted file mode 100644 index 18c6fcb8a24d..000000000000 --- a/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/MessagesFloodStageTestES7IT.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (C) 2020 Graylog, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - */ -package org.graylog.storage.elasticsearch7; - -import org.graylog.storage.elasticsearch7.testing.ElasticsearchInstanceES7; -import org.graylog.testing.elasticsearch.SearchServerInstance; -import org.graylog2.indexer.messages.MessagesFloodStageTestIT; -import org.junit.Rule; - -public class MessagesFloodStageTestES7IT extends MessagesFloodStageTestIT { - @Rule - public final ElasticsearchInstanceES7 elasticsearch = ElasticsearchInstanceES7.create(); - - @Override - protected SearchServerInstance searchServer() { - return this.elasticsearch; - } -} diff --git a/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/testing/ClientES7.java b/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/testing/ClientES7.java index 3643c24e2b70..1bcd8824bad4 100644 --- a/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/testing/ClientES7.java +++ b/graylog-storage-elasticsearch7/src/test/java/org/graylog/storage/elasticsearch7/testing/ClientES7.java @@ -24,8 +24,6 @@ import com.github.rholder.retry.WaitStrategies; import com.google.common.collect.Streams; import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsRequest; -import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsResponse; import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions; @@ -250,14 +248,6 @@ private String[] existingIndices() { .toArray(String[]::new); } - - public String getSetting(String setting) { - final ClusterGetSettingsRequest req = new ClusterGetSettingsRequest(); - final ClusterGetSettingsResponse response = client.execute((c, requestOptions) -> c.cluster().getSettings(req, requestOptions), - "Unable to read OS cluster setting: " + setting); - return response.getSetting(setting); - } - @Override public void putSetting(String setting, String value) { final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); @@ -284,11 +274,6 @@ public void resetIndexBlock(String index) { "Unable to reset index block for " + index); } - @Override - public void resetClusterBlock() { - // noop for ES7, needed for OS 2.x - } - @Override public void setIndexBlock(String index) { final UpdateSettingsRequest request = new UpdateSettingsRequest(index) diff --git a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesFloodStageTestOS2IT.java b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesFloodStageTestOS2IT.java deleted file mode 100644 index 78559d94eda4..000000000000 --- a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesFloodStageTestOS2IT.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (C) 2020 Graylog, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - */ -package org.graylog.storage.opensearch2; - -import org.graylog.storage.opensearch2.testing.OpenSearchInstance; -import org.graylog.testing.elasticsearch.SearchServerInstance; -import org.graylog2.indexer.messages.MessagesFloodStageTestIT; -import org.junit.Rule; - -public class MessagesFloodStageTestOS2IT extends MessagesFloodStageTestIT { - @Rule - public final OpenSearchInstance openSearchInstance = OpenSearchInstance.create(); - - @Override - protected SearchServerInstance searchServer() { - return this.openSearchInstance; - } -} diff --git a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesOS2IT.java b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesOS2IT.java index 40f5a4b5bb73..2fb8114d9762 100644 --- a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesOS2IT.java +++ b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/MessagesOS2IT.java @@ -16,12 +16,17 @@ */ package org.graylog.storage.opensearch2; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.refresh.RefreshRequest; import org.graylog.shaded.opensearch2.org.opensearch.action.index.IndexRequest; import org.graylog.shaded.opensearch2.org.opensearch.action.index.IndexResponse; +import org.graylog.shaded.opensearch2.org.opensearch.client.core.CountRequest; +import org.graylog.shaded.opensearch2.org.opensearch.client.core.CountResponse; import org.graylog.shaded.opensearch2.org.opensearch.rest.RestStatus; import org.graylog.storage.opensearch2.testing.OpenSearchInstance; import org.graylog.testing.elasticsearch.SearchServerInstance; import org.graylog2.indexer.messages.MessagesIT; +import org.graylog2.shared.bindings.providers.ObjectMapperProvider; import org.junit.Rule; import java.util.Map; diff --git a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/ClientOS2.java b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/ClientOS2.java index 1159c9def6bc..f73e13433384 100644 --- a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/ClientOS2.java +++ b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/ClientOS2.java @@ -24,8 +24,6 @@ import com.github.rholder.retry.WaitStrategies; import com.google.common.collect.Streams; import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.health.ClusterHealthRequest; -import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest; -import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse; import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions; @@ -223,7 +221,6 @@ public void cleanUp() { deleteIndices(existingIndices()); deleteTemplates(existingTemplates()); refreshNode(); - resetClusterBlock(); } private String[] existingTemplates() { @@ -255,31 +252,7 @@ private String[] existingIndices() { public void putSetting(String setting, String value) { final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); - if(value == null) { - request.transientSettings(Settings.builder().putNull(setting)); - } else { - request.transientSettings(Settings.builder().put(setting, value)); - } - - client.execute((c, requestOptions) -> c.cluster().putSettings(request, requestOptions), - "Unable to update OS cluster setting: " + setting + "=" + value); - } - - public String getSetting(String setting) { - final ClusterGetSettingsRequest req = new ClusterGetSettingsRequest(); - final ClusterGetSettingsResponse response = client.execute((c, requestOptions) -> c.cluster().getSettings(req, requestOptions), - "Unable to read OS cluster setting: " + setting); - return response.getSetting(setting); - } - - public void putPersistentSetting(String setting, String value) { - final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); - - if(value == null) { - request.persistentSettings(Settings.builder().putNull(setting)); - } else { - request.persistentSettings(Settings.builder().put(setting, value)); - } + request.persistentSettings(Settings.builder().put(setting, value)); client.execute((c, requestOptions) -> c.cluster().putSettings(request, requestOptions), "Unable to update OS cluster setting: " + setting + "=" + value); @@ -301,17 +274,6 @@ public void resetIndexBlock(String index) { "Unable to reset index block for " + index); } - @Override - public void resetClusterBlock() { - final String block = getSetting("cluster.blocks.create_index"); - if(Boolean.parseBoolean(block)) { - // high memory usage in previous tests may cause a cluster block. If that happens, we should reset this block before the next test. - LOG.info("Indexer cluster is blocked after heavy memory/disk usage, resetting the block"); - // reset create_index block for OpenSearch 2.x see https://github.com/opensearch-project/OpenSearch/pull/5852 - putPersistentSetting("cluster.blocks.create_index", null); - } - } - @Override public void setIndexBlock(String index) { final UpdateSettingsRequest request = new UpdateSettingsRequest(index) diff --git a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/OpenSearchInstance.java b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/OpenSearchInstance.java index 0459c8c1e739..92a2649d3561 100644 --- a/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/OpenSearchInstance.java +++ b/graylog-storage-opensearch2/src/test/java/org/graylog/storage/opensearch2/testing/OpenSearchInstance.java @@ -46,7 +46,7 @@ public class OpenSearchInstance extends TestableSearchServerInstance { private static final Logger LOG = LoggerFactory.getLogger(OpenSearchInstance.class); - public static final String DEFAULT_HEAP_SIZE = "5g"; + public static final String DEFAULT_HEAP_SIZE = "2g"; public static final SearchServer OPENSEARCH_VERSION = SearchServer.DEFAULT_OPENSEARCH_VERSION; private final OpenSearchClient openSearchClient; diff --git a/graylog2-server/src/test/java/org/graylog/testing/containermatrix/SearchServer.java b/graylog2-server/src/test/java/org/graylog/testing/containermatrix/SearchServer.java index d24cb2e2bdbe..7e004e7dd4d0 100644 --- a/graylog2-server/src/test/java/org/graylog/testing/containermatrix/SearchServer.java +++ b/graylog2-server/src/test/java/org/graylog/testing/containermatrix/SearchServer.java @@ -26,7 +26,7 @@ public enum SearchServer { ES7(ELASTICSEARCH, "7.10.2"), OS1(OPENSEARCH, "1.3.1"), OS2(OPENSEARCH, "2.0.1"), - OS2_LATEST(OPENSEARCH, "2.8.0"), + OS2_LATEST(OPENSEARCH, "2.4.0"), DATANODE_DEV(DATANODE, "5.2.0"); public static final SearchServer DEFAULT_VERSION = DATANODE_DEV; diff --git a/graylog2-server/src/test/java/org/graylog/testing/elasticsearch/Client.java b/graylog2-server/src/test/java/org/graylog/testing/elasticsearch/Client.java index a5e78be8946c..338773ebad08 100644 --- a/graylog2-server/src/test/java/org/graylog/testing/elasticsearch/Client.java +++ b/graylog2-server/src/test/java/org/graylog/testing/elasticsearch/Client.java @@ -61,14 +61,11 @@ default String createRandomIndex(String prefix) { String fieldType(String testIndexName, String source); void putSetting(String setting, String value); - String getSetting(String setting); void waitForIndexBlock(String index); void resetIndexBlock(String index); - void resetClusterBlock(); - void setIndexBlock(String index); void updateMapping(String index, Map mapping); diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesFloodStageTestIT.java b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesFloodStageTestIT.java deleted file mode 100644 index eca6b748f56f..000000000000 --- a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesFloodStageTestIT.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (C) 2020 Graylog, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - */ -package org.graylog2.indexer.messages; - -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.graylog.failure.FailureSubmissionService; -import org.graylog.testing.elasticsearch.ElasticsearchBaseTest; -import org.graylog2.indexer.IndexSet; -import org.graylog2.plugin.Message; -import org.graylog2.system.processing.ProcessingStatusRecorder; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; - -public abstract class MessagesFloodStageTestIT extends ElasticsearchBaseTest { - private static final String INDEX_NAME = "messages_it_deflector"; - - protected Messages messages; - - protected static final IndexSet indexSet = new MessagesTestIndexSet(); - - protected MessagesAdapter createMessagesAdapter() { - return searchServer().adapters().messagesAdapter(); - } - - private final FailureSubmissionService failureSubmissionService = mock(FailureSubmissionService.class); - - @Before - public void setUp() throws Exception { - client().deleteIndices(INDEX_NAME); - client().createIndex(INDEX_NAME); - client().waitForGreenStatus(INDEX_NAME); - messages = new Messages(mock(TrafficAccounting.class), createMessagesAdapter(), mock(ProcessingStatusRecorder.class), - failureSubmissionService); - } - - @After - public void tearDown() { - client().resetClusterBlock(); - client().cleanUp(); - } - - protected long messageCount(String indexName) { - searchServer().client().refreshNode(); - return searchServer().adapters().countsAdapter().totalCount(Collections.singletonList(indexName)); - } - - - @Test - public void retryIndexingMessagesDuringFloodStage() throws Exception { - triggerFloodStage(INDEX_NAME); - final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicBoolean succeeded = new AtomicBoolean(false); - final List> messageBatch = createMessageBatch(1024, 50); - - final Future> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded))); - - countDownLatch.await(); - - resetFloodStage(INDEX_NAME); - - final List failedItems = result.get(3, TimeUnit.MINUTES); - assertThat(failedItems).isEmpty(); - - client().refreshNode(); - - assertThat(messageCount(INDEX_NAME)).isEqualTo(50); - assertThat(succeeded.get()).isTrue(); - } - - private Messages.IndexingListener createIndexingListener(CountDownLatch retryLatch, AtomicBoolean successionFlag) { - return new Messages.IndexingListener() { - @Override - public void onRetry(long attemptNumber) { - retryLatch.countDown(); - } - - @Override - public void onSuccess(long delaySinceFirstAttempt) { - if (retryLatch.getCount() > 0) { - retryLatch.countDown(); - } - successionFlag.set(true); - } - }; - } - - private Future> background(Callable> task) { - final ExecutorService executor = Executors.newFixedThreadPool(1, - new ThreadFactoryBuilder().setNameFormat("messages-it-%d").build()); - - return executor.submit(task); - } - - private void triggerFloodStage(String index) { - client().putSetting("cluster.routing.allocation.disk.watermark.low", "0%"); - client().putSetting("cluster.routing.allocation.disk.watermark.high", "0%"); - client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "0%"); - - client().waitForIndexBlock(index); - } - - private void resetFloodStage(String index) { - client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "95%"); - client().putSetting("cluster.routing.allocation.disk.watermark.high", "90%"); - client().putSetting("cluster.routing.allocation.disk.watermark.low", "85%"); - - client().resetIndexBlock(index); - client().resetClusterBlock(); - } - - private DateTime now() { - return DateTime.now(DateTimeZone.UTC); - } - - private ArrayList> createMessageBatch(int size, int count) { - final ArrayList> messageList = new ArrayList<>(); - - final String message = "A".repeat(size); - for (int i = 0; i < count; i++) { - messageList.add(Maps.immutableEntry(indexSet, new Message(i + message, "source", now()))); - } - return messageList; - } -} diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesIT.java b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesIT.java index 742377ee7284..9723e20d8307 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesIT.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesIT.java @@ -69,7 +69,6 @@ protected MessagesAdapter createMessagesAdapter() { @Before public void setUp() throws Exception { - client().resetClusterBlock(); // the index may be blocked with delay from a previous test, if yes then reset it. client().deleteIndices(INDEX_NAME); client().createIndex(INDEX_NAME); client().waitForGreenStatus(INDEX_NAME); @@ -169,6 +168,28 @@ public void conflictingFieldTypesErrorAreReported() throws Exception { verify(failureSubmissionService).submitIndexingErrors(argThat(arg -> arg.size() == 1)); } + @Test + public void retryIndexingMessagesDuringFloodStage() throws Exception { + triggerFloodStage(INDEX_NAME); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicBoolean succeeded = new AtomicBoolean(false); + final List> messageBatch = createMessageBatch(1024, 50); + + final Future> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded))); + + countDownLatch.await(); + + resetFloodStage(INDEX_NAME); + + final List failedItems = result.get(3, TimeUnit.MINUTES); + assertThat(failedItems).isEmpty(); + + client().refreshNode(); + + assertThat(messageCount(INDEX_NAME)).isEqualTo(50); + assertThat(succeeded.get()).isTrue(); + } + private Messages.IndexingListener createIndexingListener(CountDownLatch retryLatch, AtomicBoolean successionFlag) { return new Messages.IndexingListener() { @Override @@ -240,6 +261,22 @@ private Future> background(Callable> task) { return executor.submit(task); } + private void triggerFloodStage(String index) { + client().putSetting("cluster.routing.allocation.disk.watermark.low", "0%"); + client().putSetting("cluster.routing.allocation.disk.watermark.high", "0%"); + client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "0%"); + + client().waitForIndexBlock(index); + } + + private void resetFloodStage(String index) { + client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "95%"); + client().putSetting("cluster.routing.allocation.disk.watermark.high", "90%"); + client().putSetting("cluster.routing.allocation.disk.watermark.low", "85%"); + + client().resetIndexBlock(index); + } + private Map.Entry entry(IndexSet indexSet, Message message) { return new AbstractMap.SimpleEntry<>(indexSet, message); }