Skip to content

Commit

Permalink
Fix tests for OpenSearch and blocked index/cluster after watermark hit (
Browse files Browse the repository at this point in the history
#15420)

* unblocking cluster create index before every test, index unblocking only is not enough

* unblocking cluster create index before every test, index unblocking only is not enough

* proper method + cleanup

* reset possible blocks from other tests

* reset possible blocks from other tests

* reset possible blocks from other tests

* reset possible blocks from other tests

* reset possible blocks from other tests

* removed unnecessary resets

* different mix transient/persistent settings regarding watermarks & reset of said watermark and blocks

* removed unnecessary resets

* adding cluster unblock temporarily

* undo adding cluster unblock temporarily

* temporarily disabling flood stage test

* Moving Test to separate class

* Spinning up a separate container

* removed unnecessary changes in FixtureImporter

* removed unnecessary changes in FixtureImporter

* fixed forbidden call in test

* avoid creating new Opensearch instance for the MessagesFloodStageTestIT

* Reset cluster block after MessagesIT if memory usage blocks the cluster

* better cluster block reset

* remove obsolete code

* bump to 2.7

* bumping RAM

* bumping to 5g heap

* bump to 2.8

---------

Co-authored-by: Tomas Dvorak <[email protected]>
  • Loading branch information
janheise and todvora authored Jul 17, 2023
1 parent 17ffa5e commit 259fffb
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.storage.elasticsearch7;

import org.graylog.storage.elasticsearch7.testing.ElasticsearchInstanceES7;
import org.graylog.testing.elasticsearch.SearchServerInstance;
import org.graylog2.indexer.messages.MessagesFloodStageTestIT;
import org.junit.Rule;

public class MessagesFloodStageTestES7IT extends MessagesFloodStageTestIT {
@Rule
public final ElasticsearchInstanceES7 elasticsearch = ElasticsearchInstanceES7.create();

@Override
protected SearchServerInstance searchServer() {
return this.elasticsearch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.github.rholder.retry.WaitStrategies;
import com.google.common.collect.Streams;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsResponse;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
Expand Down Expand Up @@ -248,6 +250,14 @@ private String[] existingIndices() {
.toArray(String[]::new);
}


public String getSetting(String setting) {
final ClusterGetSettingsRequest req = new ClusterGetSettingsRequest();
final ClusterGetSettingsResponse response = client.execute((c, requestOptions) -> c.cluster().getSettings(req, requestOptions),
"Unable to read OS cluster setting: " + setting);
return response.getSetting(setting);
}

@Override
public void putSetting(String setting, String value) {
final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();
Expand All @@ -274,6 +284,11 @@ public void resetIndexBlock(String index) {
"Unable to reset index block for " + index);
}

@Override
public void resetClusterBlock() {
// noop for ES7, needed for OS 2.x
}

@Override
public void setIndexBlock(String index) {
final UpdateSettingsRequest request = new UpdateSettingsRequest(index)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.storage.opensearch2;

import org.graylog.storage.opensearch2.testing.OpenSearchInstance;
import org.graylog.testing.elasticsearch.SearchServerInstance;
import org.graylog2.indexer.messages.MessagesFloodStageTestIT;
import org.junit.Rule;

public class MessagesFloodStageTestOS2IT extends MessagesFloodStageTestIT {
@Rule
public final OpenSearchInstance openSearchInstance = OpenSearchInstance.create();

@Override
protected SearchServerInstance searchServer() {
return this.openSearchInstance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,12 @@
*/
package org.graylog.storage.opensearch2;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.index.IndexRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.index.IndexResponse;
import org.graylog.shaded.opensearch2.org.opensearch.client.core.CountRequest;
import org.graylog.shaded.opensearch2.org.opensearch.client.core.CountResponse;
import org.graylog.shaded.opensearch2.org.opensearch.rest.RestStatus;
import org.graylog.storage.opensearch2.testing.OpenSearchInstance;
import org.graylog.testing.elasticsearch.SearchServerInstance;
import org.graylog2.indexer.messages.MessagesIT;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.junit.Rule;

import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.github.rholder.retry.WaitStrategies;
import com.google.common.collect.Streams;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
Expand Down Expand Up @@ -221,6 +223,7 @@ public void cleanUp() {
deleteIndices(existingIndices());
deleteTemplates(existingTemplates());
refreshNode();
resetClusterBlock();
}

private String[] existingTemplates() {
Expand Down Expand Up @@ -252,7 +255,31 @@ private String[] existingIndices() {
public void putSetting(String setting, String value) {
final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();

request.persistentSettings(Settings.builder().put(setting, value));
if(value == null) {
request.transientSettings(Settings.builder().putNull(setting));
} else {
request.transientSettings(Settings.builder().put(setting, value));
}

client.execute((c, requestOptions) -> c.cluster().putSettings(request, requestOptions),
"Unable to update OS cluster setting: " + setting + "=" + value);
}

public String getSetting(String setting) {
final ClusterGetSettingsRequest req = new ClusterGetSettingsRequest();
final ClusterGetSettingsResponse response = client.execute((c, requestOptions) -> c.cluster().getSettings(req, requestOptions),
"Unable to read OS cluster setting: " + setting);
return response.getSetting(setting);
}

public void putPersistentSetting(String setting, String value) {
final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();

if(value == null) {
request.persistentSettings(Settings.builder().putNull(setting));
} else {
request.persistentSettings(Settings.builder().put(setting, value));
}

client.execute((c, requestOptions) -> c.cluster().putSettings(request, requestOptions),
"Unable to update OS cluster setting: " + setting + "=" + value);
Expand All @@ -274,6 +301,17 @@ public void resetIndexBlock(String index) {
"Unable to reset index block for " + index);
}

@Override
public void resetClusterBlock() {
final String block = getSetting("cluster.blocks.create_index");
if(Boolean.parseBoolean(block)) {
// high memory usage in previous tests may cause a cluster block. If that happens, we should reset this block before the next test.
LOG.info("Indexer cluster is blocked after heavy memory/disk usage, resetting the block");
// reset create_index block for OpenSearch 2.x see https://github.com/opensearch-project/OpenSearch/pull/5852
putPersistentSetting("cluster.blocks.create_index", null);
}
}

@Override
public void setIndexBlock(String index) {
final UpdateSettingsRequest request = new UpdateSettingsRequest(index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
public class OpenSearchInstance extends TestableSearchServerInstance {
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchInstance.class);

public static final String DEFAULT_HEAP_SIZE = "2g";
public static final String DEFAULT_HEAP_SIZE = "5g";
public static final SearchServer OPENSEARCH_VERSION = SearchServer.DEFAULT_OPENSEARCH_VERSION;

private final OpenSearchClient openSearchClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public enum SearchServer {
ES7(ELASTICSEARCH, "7.10.2"),
OS1(OPENSEARCH, "1.3.1"),
OS2(OPENSEARCH, "2.0.1"),
OS2_LATEST(OPENSEARCH, "2.4.0"),
OS2_LATEST(OPENSEARCH, "2.8.0"),
DATANODE_DEV(DATANODE, "5.2.0");

public static final SearchServer DEFAULT_VERSION = DATANODE_DEV;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ default String createRandomIndex(String prefix) {
String fieldType(String testIndexName, String source);

void putSetting(String setting, String value);
String getSetting(String setting);

void waitForIndexBlock(String index);

void resetIndexBlock(String index);

void resetClusterBlock();

void setIndexBlock(String index);

void updateMapping(String index, Map<String, Object> mapping);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.indexer.messages;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.graylog.failure.FailureSubmissionService;
import org.graylog.testing.elasticsearch.ElasticsearchBaseTest;
import org.graylog2.indexer.IndexSet;
import org.graylog2.plugin.Message;
import org.graylog2.system.processing.ProcessingStatusRecorder;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

public abstract class MessagesFloodStageTestIT extends ElasticsearchBaseTest {
private static final String INDEX_NAME = "messages_it_deflector";

protected Messages messages;

protected static final IndexSet indexSet = new MessagesTestIndexSet();

protected MessagesAdapter createMessagesAdapter() {
return searchServer().adapters().messagesAdapter();
}

private final FailureSubmissionService failureSubmissionService = mock(FailureSubmissionService.class);

@Before
public void setUp() throws Exception {
client().deleteIndices(INDEX_NAME);
client().createIndex(INDEX_NAME);
client().waitForGreenStatus(INDEX_NAME);
messages = new Messages(mock(TrafficAccounting.class), createMessagesAdapter(), mock(ProcessingStatusRecorder.class),
failureSubmissionService);
}

@After
public void tearDown() {
client().resetClusterBlock();
client().cleanUp();
}

protected long messageCount(String indexName) {
searchServer().client().refreshNode();
return searchServer().adapters().countsAdapter().totalCount(Collections.singletonList(indexName));
}


@Test
public void retryIndexingMessagesDuringFloodStage() throws Exception {
triggerFloodStage(INDEX_NAME);
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicBoolean succeeded = new AtomicBoolean(false);
final List<Map.Entry<IndexSet, Message>> messageBatch = createMessageBatch(1024, 50);

final Future<List<String>> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded)));

countDownLatch.await();

resetFloodStage(INDEX_NAME);

final List<String> failedItems = result.get(3, TimeUnit.MINUTES);
assertThat(failedItems).isEmpty();

client().refreshNode();

assertThat(messageCount(INDEX_NAME)).isEqualTo(50);
assertThat(succeeded.get()).isTrue();
}

private Messages.IndexingListener createIndexingListener(CountDownLatch retryLatch, AtomicBoolean successionFlag) {
return new Messages.IndexingListener() {
@Override
public void onRetry(long attemptNumber) {
retryLatch.countDown();
}

@Override
public void onSuccess(long delaySinceFirstAttempt) {
if (retryLatch.getCount() > 0) {
retryLatch.countDown();
}
successionFlag.set(true);
}
};
}

private Future<List<String>> background(Callable<List<String>> task) {
final ExecutorService executor = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("messages-it-%d").build());

return executor.submit(task);
}

private void triggerFloodStage(String index) {
client().putSetting("cluster.routing.allocation.disk.watermark.low", "0%");
client().putSetting("cluster.routing.allocation.disk.watermark.high", "0%");
client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "0%");

client().waitForIndexBlock(index);
}

private void resetFloodStage(String index) {
client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "95%");
client().putSetting("cluster.routing.allocation.disk.watermark.high", "90%");
client().putSetting("cluster.routing.allocation.disk.watermark.low", "85%");

client().resetIndexBlock(index);
client().resetClusterBlock();
}

private DateTime now() {
return DateTime.now(DateTimeZone.UTC);
}

private ArrayList<Map.Entry<IndexSet, Message>> createMessageBatch(int size, int count) {
final ArrayList<Map.Entry<IndexSet, Message>> messageList = new ArrayList<>();

final String message = "A".repeat(size);
for (int i = 0; i < count; i++) {
messageList.add(Maps.immutableEntry(indexSet, new Message(i + message, "source", now())));
}
return messageList;
}
}
Loading

0 comments on commit 259fffb

Please sign in to comment.