Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tests for OpenSearch and blocked index/cluster after watermark hit #15420

Merged
merged 31 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c115877
unblocking cluster create index before every test, index unblocking o…
janheise May 8, 2023
d6ab39c
unblocking cluster create index before every test, index unblocking o…
janheise May 8, 2023
acf461e
proper method + cleanup
janheise May 8, 2023
62d1110
reset possible blocks from other tests
janheise May 8, 2023
d5d3ea7
reset possible blocks from other tests
janheise May 8, 2023
f5ce81a
reset possible blocks from other tests
janheise May 8, 2023
393eab4
reset possible blocks from other tests
janheise May 8, 2023
4bec227
reset possible blocks from other tests
janheise May 8, 2023
167f149
removed unnecessary resets
janheise May 8, 2023
1f9e8c2
different mix transient/persistent settings regarding watermarks & re…
janheise May 8, 2023
62fa69d
removed unnecessary resets
janheise May 8, 2023
0dedaba
adding cluster unblock temporarily
janheise May 8, 2023
0ee6700
undo adding cluster unblock temporarily
janheise May 9, 2023
c468f63
temporarily disabling flood stage test
janheise May 9, 2023
454662d
Moving Test to separate class
janheise May 9, 2023
97e5fcb
Spinning up a separate container
janheise May 9, 2023
c557ed3
removed unnecessary changes in FixtureImporter
janheise May 9, 2023
d48fccc
removed unnecessary changes in FixtureImporter
janheise May 9, 2023
c889347
merge master
todvora Jun 20, 2023
c7b58db
fixed forbidden call in test
todvora Jun 20, 2023
b4b350a
Merge branch 'master' into fix/opensearch2_6_0_support
todvora Jul 11, 2023
751eccf
avoid creating new Opensearch instance for the MessagesFloodStageTestIT
todvora Jul 11, 2023
45c388d
Reset cluster block after MessagesIT if memory usage blocks the cluster
todvora Jul 11, 2023
e730ac9
better cluster block reset
todvora Jul 11, 2023
81cc88e
Merge branch 'master' into fix/opensearch2_6_0_support
todvora Jul 11, 2023
1f49ad1
Merge branch 'master' into fix/opensearch2_6_0_support
janheise Jul 13, 2023
f446b88
remove obsolete code
janheise Jul 13, 2023
8ffaf7a
bump to 2.7
janheise Jul 13, 2023
b000991
bumping RAM
janheise Jul 14, 2023
b09d4fc
bumping to 5g heap
janheise Jul 14, 2023
0da6e20
bump to 2.8
janheise Jul 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.storage.elasticsearch7;

import org.graylog.storage.elasticsearch7.testing.ElasticsearchInstanceES7;
import org.graylog.testing.elasticsearch.SearchServerInstance;
import org.graylog2.indexer.messages.MessagesFloodStageTestIT;
import org.junit.Rule;

public class MessagesFloodStageTestES7IT extends MessagesFloodStageTestIT {
@Rule
public final ElasticsearchInstanceES7 elasticsearch = ElasticsearchInstanceES7.create();

@Override
protected SearchServerInstance searchServer() {
return this.elasticsearch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.github.rholder.retry.WaitStrategies;
import com.google.common.collect.Streams;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsResponse;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
Expand Down Expand Up @@ -248,6 +250,14 @@ private String[] existingIndices() {
.toArray(String[]::new);
}


public String getSetting(String setting) {
final ClusterGetSettingsRequest req = new ClusterGetSettingsRequest();
final ClusterGetSettingsResponse response = client.execute((c, requestOptions) -> c.cluster().getSettings(req, requestOptions),
"Unable to read OS cluster setting: " + setting);
return response.getSetting(setting);
}

@Override
public void putSetting(String setting, String value) {
final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();
Expand All @@ -274,6 +284,11 @@ public void resetIndexBlock(String index) {
"Unable to reset index block for " + index);
}

@Override
public void resetClusterBlock() {
// noop for ES7, needed for OS 2.x
}

@Override
public void setIndexBlock(String index) {
final UpdateSettingsRequest request = new UpdateSettingsRequest(index)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.storage.opensearch2;

import org.graylog.storage.opensearch2.testing.OpenSearchInstance;
import org.graylog.testing.elasticsearch.SearchServerInstance;
import org.graylog2.indexer.messages.MessagesFloodStageTestIT;
import org.junit.Rule;

public class MessagesFloodStageTestOS2IT extends MessagesFloodStageTestIT {
@Rule
public final OpenSearchInstance openSearchInstance = OpenSearchInstance.create();

@Override
protected SearchServerInstance searchServer() {
return this.openSearchInstance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,12 @@
*/
package org.graylog.storage.opensearch2;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.index.IndexRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.index.IndexResponse;
import org.graylog.shaded.opensearch2.org.opensearch.client.core.CountRequest;
import org.graylog.shaded.opensearch2.org.opensearch.client.core.CountResponse;
import org.graylog.shaded.opensearch2.org.opensearch.rest.RestStatus;
import org.graylog.storage.opensearch2.testing.OpenSearchInstance;
import org.graylog.testing.elasticsearch.SearchServerInstance;
import org.graylog2.indexer.messages.MessagesIT;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.junit.Rule;

import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.github.rholder.retry.WaitStrategies;
import com.google.common.collect.Streams;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
Expand Down Expand Up @@ -221,6 +223,7 @@ public void cleanUp() {
deleteIndices(existingIndices());
deleteTemplates(existingTemplates());
refreshNode();
resetClusterBlock();
}

private String[] existingTemplates() {
Expand Down Expand Up @@ -252,7 +255,31 @@ private String[] existingIndices() {
public void putSetting(String setting, String value) {
final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();

request.persistentSettings(Settings.builder().put(setting, value));
if(value == null) {
request.transientSettings(Settings.builder().putNull(setting));
} else {
request.transientSettings(Settings.builder().put(setting, value));
}

client.execute((c, requestOptions) -> c.cluster().putSettings(request, requestOptions),
"Unable to update OS cluster setting: " + setting + "=" + value);
}

public String getSetting(String setting) {
final ClusterGetSettingsRequest req = new ClusterGetSettingsRequest();
final ClusterGetSettingsResponse response = client.execute((c, requestOptions) -> c.cluster().getSettings(req, requestOptions),
"Unable to read OS cluster setting: " + setting);
return response.getSetting(setting);
}

public void putPersistentSetting(String setting, String value) {
final ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();

if(value == null) {
request.persistentSettings(Settings.builder().putNull(setting));
} else {
request.persistentSettings(Settings.builder().put(setting, value));
}

client.execute((c, requestOptions) -> c.cluster().putSettings(request, requestOptions),
"Unable to update OS cluster setting: " + setting + "=" + value);
Expand All @@ -274,6 +301,17 @@ public void resetIndexBlock(String index) {
"Unable to reset index block for " + index);
}

@Override
public void resetClusterBlock() {
final String block = getSetting("cluster.blocks.create_index");
if(Boolean.parseBoolean(block)) {
// high memory usage in previous tests may cause a cluster block. If that happens, we should reset this block before the next test.
LOG.info("Indexer cluster is blocked after heavy memory/disk usage, resetting the block");
// reset create_index block for OpenSearch 2.x see https://github.com/opensearch-project/OpenSearch/pull/5852
putPersistentSetting("cluster.blocks.create_index", null);
}
}

@Override
public void setIndexBlock(String index) {
final UpdateSettingsRequest request = new UpdateSettingsRequest(index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
public class OpenSearchInstance extends TestableSearchServerInstance {
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchInstance.class);

public static final String DEFAULT_HEAP_SIZE = "2g";
public static final String DEFAULT_HEAP_SIZE = "5g";
public static final SearchServer OPENSEARCH_VERSION = SearchServer.DEFAULT_OPENSEARCH_VERSION;

private final OpenSearchClient openSearchClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public enum SearchServer {
ES7(ELASTICSEARCH, "7.10.2"),
OS1(OPENSEARCH, "1.3.1"),
OS2(OPENSEARCH, "2.0.1"),
OS2_LATEST(OPENSEARCH, "2.4.0"),
OS2_LATEST(OPENSEARCH, "2.8.0"),
DATANODE_DEV(DATANODE, "5.2.0");

public static final SearchServer DEFAULT_VERSION = DATANODE_DEV;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ default String createRandomIndex(String prefix) {
String fieldType(String testIndexName, String source);

void putSetting(String setting, String value);
String getSetting(String setting);

void waitForIndexBlock(String index);

void resetIndexBlock(String index);

void resetClusterBlock();

void setIndexBlock(String index);

void updateMapping(String index, Map<String, Object> mapping);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.indexer.messages;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.graylog.failure.FailureSubmissionService;
import org.graylog.testing.elasticsearch.ElasticsearchBaseTest;
import org.graylog2.indexer.IndexSet;
import org.graylog2.plugin.Message;
import org.graylog2.system.processing.ProcessingStatusRecorder;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

public abstract class MessagesFloodStageTestIT extends ElasticsearchBaseTest {
private static final String INDEX_NAME = "messages_it_deflector";

protected Messages messages;

protected static final IndexSet indexSet = new MessagesTestIndexSet();

protected MessagesAdapter createMessagesAdapter() {
return searchServer().adapters().messagesAdapter();
}

private final FailureSubmissionService failureSubmissionService = mock(FailureSubmissionService.class);

@Before
public void setUp() throws Exception {
client().deleteIndices(INDEX_NAME);
client().createIndex(INDEX_NAME);
client().waitForGreenStatus(INDEX_NAME);
messages = new Messages(mock(TrafficAccounting.class), createMessagesAdapter(), mock(ProcessingStatusRecorder.class),
failureSubmissionService);
}

@After
public void tearDown() {
client().resetClusterBlock();
client().cleanUp();
}

protected long messageCount(String indexName) {
searchServer().client().refreshNode();
return searchServer().adapters().countsAdapter().totalCount(Collections.singletonList(indexName));
}


@Test
public void retryIndexingMessagesDuringFloodStage() throws Exception {
triggerFloodStage(INDEX_NAME);
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicBoolean succeeded = new AtomicBoolean(false);
final List<Map.Entry<IndexSet, Message>> messageBatch = createMessageBatch(1024, 50);

final Future<List<String>> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded)));

countDownLatch.await();

resetFloodStage(INDEX_NAME);

final List<String> failedItems = result.get(3, TimeUnit.MINUTES);
assertThat(failedItems).isEmpty();

client().refreshNode();

assertThat(messageCount(INDEX_NAME)).isEqualTo(50);
assertThat(succeeded.get()).isTrue();
}

private Messages.IndexingListener createIndexingListener(CountDownLatch retryLatch, AtomicBoolean successionFlag) {
return new Messages.IndexingListener() {
@Override
public void onRetry(long attemptNumber) {
retryLatch.countDown();
}

@Override
public void onSuccess(long delaySinceFirstAttempt) {
if (retryLatch.getCount() > 0) {
retryLatch.countDown();
}
successionFlag.set(true);
}
};
}

private Future<List<String>> background(Callable<List<String>> task) {
final ExecutorService executor = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("messages-it-%d").build());

return executor.submit(task);
}

private void triggerFloodStage(String index) {
client().putSetting("cluster.routing.allocation.disk.watermark.low", "0%");
client().putSetting("cluster.routing.allocation.disk.watermark.high", "0%");
client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "0%");

client().waitForIndexBlock(index);
}

private void resetFloodStage(String index) {
client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "95%");
client().putSetting("cluster.routing.allocation.disk.watermark.high", "90%");
client().putSetting("cluster.routing.allocation.disk.watermark.low", "85%");

client().resetIndexBlock(index);
client().resetClusterBlock();
}

private DateTime now() {
return DateTime.now(DateTimeZone.UTC);
}

private ArrayList<Map.Entry<IndexSet, Message>> createMessageBatch(int size, int count) {
final ArrayList<Map.Entry<IndexSet, Message>> messageList = new ArrayList<>();

final String message = "A".repeat(size);
for (int i = 0; i < count; i++) {
messageList.add(Maps.immutableEntry(indexSet, new Message(i + message, "source", now())));
}
return messageList;
}
}
Loading
Loading