Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ElasticSearchOutput for Instant Archiving #15952

Merged
merged 2 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import javax.inject.Singleton;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -116,21 +115,21 @@ public List<String> analyze(String toAnalyze, String index, String analyzer) thr
return messagesAdapter.analyze(toAnalyze, index, analyzer);
}

public List<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList) {
public Set<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList) {
return bulkIndex(messageList, false, null);
}

public List<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList, IndexingListener indexingListener) {
public Set<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList, IndexingListener indexingListener) {
return bulkIndex(messageList, false, indexingListener);
}

public List<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList, boolean isSystemTraffic) {
public Set<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList, boolean isSystemTraffic) {
return bulkIndex(messageList, isSystemTraffic, null);
}

public List<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList, boolean isSystemTraffic, IndexingListener indexingListener) {
public Set<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList, boolean isSystemTraffic, IndexingListener indexingListener) {
if (messageList.isEmpty()) {
return Collections.emptyList();
return Set.of();
}

final List<IndexingRequest> indexingRequestList = messageList.stream()
Expand All @@ -140,11 +139,11 @@ public List<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageLi
return bulkIndexRequests(indexingRequestList, isSystemTraffic, indexingListener);
}

public List<String> bulkIndexRequests(List<IndexingRequest> indexingRequestList, boolean isSystemTraffic) {
public Set<String> bulkIndexRequests(List<IndexingRequest> indexingRequestList, boolean isSystemTraffic) {
return bulkIndexRequests(indexingRequestList, isSystemTraffic, null);
}

public List<String> bulkIndexRequests(List<IndexingRequest> indexingRequestList, boolean isSystemTraffic, IndexingListener indexingListener) {
public Set<String> bulkIndexRequests(List<IndexingRequest> indexingRequestList, boolean isSystemTraffic, IndexingListener indexingListener) {
final List<IndexingError> indexingErrors = runBulkRequest(indexingRequestList, indexingRequestList.size(), indexingListener);

final Set<IndexingError> remainingErrors = retryOnlyIndexBlockItemsForever(indexingRequestList, indexingErrors, indexingListener);
Expand Down Expand Up @@ -268,16 +267,16 @@ private void recordTimestamp(List<IndexingRequest> messageList) {
}
}

private List<String> propagateFailure(Collection<IndexingError> indexingErrors) {
private Set<String> propagateFailure(Collection<IndexingError> indexingErrors) {
if (indexingErrors.isEmpty()) {
return Collections.emptyList();
return Set.of();
}

failureSubmissionService.submitIndexingErrors(indexingErrors);

return indexingErrors.stream()
.map(IndexingError::message).map(Indexable::getId)
.collect(Collectors.toList());
.collect(Collectors.toSet());
}

@AutoValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -45,6 +46,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static com.codahale.metrics.MetricRegistry.name;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down Expand Up @@ -130,11 +132,10 @@ private void flush(List<Map.Entry<IndexSet, Message>> messages) {
activeFlushThreads.get());
}

try (Timer.Context ignored = processTime.time()) {
lastFlushTime.set(System.nanoTime());
writeMessageEntries(messages);
batchSize.update(messages.size());
bufferFlushes.mark();
try {
indexMessageBatch(messages);
// This does not exclude failedMessageIds, because we don't know if ES is ever gonna accept these messages.
acknowledger.acknowledge(messages.stream().map(Map.Entry::getValue).collect(Collectors.toList()));
} catch (Exception e) {
log.error("Unable to flush message buffer", e);
bufferFlushFailures.mark();
Expand All @@ -143,6 +144,16 @@ private void flush(List<Map.Entry<IndexSet, Message>> messages) {
log.debug("Flushing {} messages completed", messages.size());
}

protected Set<String> indexMessageBatch(List<Map.Entry<IndexSet, Message>> messages) throws Exception {
try (Timer.Context ignored = processTime.time()) {
lastFlushTime.set(System.nanoTime());
final Set<String> failedMessageIds = writeMessageEntries(messages);
batchSize.update(messages.size());
bufferFlushes.mark();
return failedMessageIds;
}
}

public void forceFlushIfTimedout() {
// if we shouldn't flush at all based on the last flush time, no need to synchronize on this.
if (lastFlushTime.get() != 0 &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand All @@ -55,7 +56,7 @@ public class ElasticSearchOutput implements MessageOutput {
private final Messages messages;
private final Journal journal;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final MessageQueueAcknowledger acknowledger;
protected final MessageQueueAcknowledger acknowledger;

@Inject
public ElasticSearchOutput(MetricRegistry metricRegistry,
Expand Down Expand Up @@ -87,7 +88,7 @@ public void write(List<Message> messageList) throws Exception {
throw new UnsupportedOperationException("Method not supported!");
}

public void writeMessageEntries(List<Map.Entry<IndexSet, Message>> messageList) throws Exception {
public Set<String> writeMessageEntries(List<Map.Entry<IndexSet, Message>> messageList) throws Exception {
if (LOG.isTraceEnabled()) {
final String sortedIds = messageList.stream()
.map(Map.Entry::getValue)
Expand All @@ -98,14 +99,13 @@ public void writeMessageEntries(List<Map.Entry<IndexSet, Message>> messageList)
}

writes.mark(messageList.size());
final List<String> failedMessageIds;
final Set<String> failedMessageIds;
try (final Timer.Context ignored = processTime.time()) {
failedMessageIds = messages.bulkIndex(messageList);
}
failures.mark(failedMessageIds.size());

// This does not exclude failedMessageIds, because we don't know if ES is ever gonna accept these messages.
acknowledger.acknowledge(messageList.stream().map(Map.Entry::getValue).collect(Collectors.toList()));
return failedMessageIds;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -72,7 +73,7 @@ public void testIfLargeBatchesGetSplitUpOnCircuitBreakerExceptions() throws Exce
final int MESSAGECOUNT = 50;
// Each Message is about 1 MB
final List<Map.Entry<IndexSet, Message>> largeMessageBatch = createMessageBatch(1024 * 1024, MESSAGECOUNT);
final List<String> failedItems = this.messages.bulkIndex(largeMessageBatch);
final Set<String> failedItems = this.messages.bulkIndex(largeMessageBatch);

client().refreshNode(); // wait for ES to finish indexing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -60,7 +61,7 @@ void setUp() {

@Test
public void bulkIndexingShouldNotDoAnythingForEmptyList() throws Exception {
final List<String> result = messages.bulkIndex(Collections.emptyList());
final Set<String> result = messages.bulkIndex(Collections.emptyList());

assertThat(result).isNotNull()
.isEmpty();
Expand All @@ -86,7 +87,7 @@ public void bulkIndexingShouldNotRetryForIndexMappingErrors() throws Exception {

final List<Map.Entry<IndexSet, Message>> messageList = messageListWith(mockedMessage);

final List<String> result = messages.bulkIndex(messageList);
final Set<String> result = messages.bulkIndex(messageList);

assertThat(result).hasSize(1);

Expand All @@ -101,7 +102,7 @@ public void bulkIndexingShouldRetry() throws Exception {

final List<Map.Entry<IndexSet, Message>> messageList = messageListWith(mock(Message.class));

final List<String> result = messages.bulkIndex(messageList);
final Set<String> result = messages.bulkIndex(messageList);

assertThat(result).isNotNull().isEmpty();

Expand All @@ -119,7 +120,7 @@ public void bulkIndexingShouldRetryIfIndexBlocked() throws IOException {
.thenReturn(errorResult)
.thenReturn(successResult);

final List<String> result = messages.bulkIndex(messagesWithIds("blocked-id"));
final Set<String> result = messages.bulkIndex(messagesWithIds("blocked-id"));

verify(messagesAdapter, times(2)).bulkIndex(any());
assertThat(result).isNotNull().isEmpty();
Expand All @@ -137,7 +138,7 @@ public void indexBlockedRetriesShouldOnlyRetryIndexBlockedErrors() throws IOExce
.thenReturn(errorResult)
.thenReturn(successResult);

final List<String> result = messages.bulkIndex(messagesWithIds("blocked-id", "other-error-id"));
final Set<String> result = messages.bulkIndex(messagesWithIds("blocked-id", "other-error-id"));

verify(messagesAdapter, times(2)).bulkIndex(any());
assertThat(result).containsOnly("other-error-id");
Expand All @@ -157,7 +158,7 @@ public void retriedIndexBlockErrorsThatFailWithDifferentErrorsAreTreatedAsPersis
.thenReturn(errorResult)
.thenReturn(secondErrorResult);

final List<String> result = messages.bulkIndex(messagesWithIds("blocked-id", "other-error-id"));
final Set<String> result = messages.bulkIndex(messagesWithIds("blocked-id", "other-error-id"));

verify(messagesAdapter, times(2)).bulkIndex(any());
assertThat(result).containsOnly("other-error-id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -119,7 +120,7 @@ public void testIfTooLargeBatchesGetSplitUp() throws Exception {
final int MESSAGECOUNT = 101;
// Each Message is about 1 MB
final List<Map.Entry<IndexSet, Message>> largeMessageBatch = createMessageBatch(1024 * 1024, MESSAGECOUNT);
final List<String> failedItems = this.messages.bulkIndex(largeMessageBatch);
final Set<String> failedItems = this.messages.bulkIndex(largeMessageBatch);

assertThat(failedItems).isEmpty();

Expand All @@ -139,7 +140,7 @@ public void unevenTooLargeBatchesGetSplitUp() throws Exception {
final int LARGE_MESSAGECOUNT = 20;
final List<Map.Entry<IndexSet, Message>> messageBatch = createMessageBatch(1024, MESSAGECOUNT);
messageBatch.addAll(createMessageBatch(1024 * 1024 * 5, LARGE_MESSAGECOUNT));
final List<String> failedItems = this.messages.bulkIndex(messageBatch);
final Set<String> failedItems = this.messages.bulkIndex(messageBatch);

assertThat(failedItems).isEmpty();

Expand All @@ -161,7 +162,7 @@ public void conflictingFieldTypesErrorAreReported() throws Exception {
entry(indexSet, message2)
);

final List<String> failedItems = this.messages.bulkIndex(messageBatch);
final Set<String> failedItems = this.messages.bulkIndex(messageBatch);

assertThat(failedItems).hasSize(1);

Expand All @@ -175,13 +176,13 @@ public void retryIndexingMessagesDuringFloodStage() throws Exception {
final AtomicBoolean succeeded = new AtomicBoolean(false);
final List<Map.Entry<IndexSet, Message>> messageBatch = createMessageBatch(1024, 50);

final Future<List<String>> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded)));
final Future<Set<String>> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded)));

countDownLatch.await();

resetFloodStage(INDEX_NAME);

final List<String> failedItems = result.get(3, TimeUnit.MINUTES);
final Set<String> failedItems = result.get(3, TimeUnit.MINUTES);
assertThat(failedItems).isEmpty();

client().refreshNode();
Expand Down Expand Up @@ -220,13 +221,13 @@ public void retryIndexingMessagesIfTargetAliasIsInvalid() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicBoolean succeeded = new AtomicBoolean(false);

final Future<List<String>> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded)));
final Future<Set<String>> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded)));

countDownLatch.await();

client().removeAliasMapping(index2, INDEX_NAME);

final List<String> failedItems = result.get(3, TimeUnit.MINUTES);
final Set<String> failedItems = result.get(3, TimeUnit.MINUTES);
assertThat(failedItems).isEmpty();

client().refreshNode();
Expand All @@ -243,7 +244,7 @@ public void properlySerializesCustomObjectsInMessageField() throws IOException {
Maps.immutableEntry(indexSet, message)
);

final List<String> failedItems = this.messages.bulkIndex(messageBatch);
final Set<String> failedItems = this.messages.bulkIndex(messageBatch);

assertThat(failedItems).isEmpty();

Expand All @@ -254,7 +255,7 @@ public void properlySerializesCustomObjectsInMessageField() throws IOException {
assertThat(resultMessage.getMessage().getField("custom_object")).isEqualTo("foo");
}

private Future<List<String>> background(Callable<List<String>> task) {
private Future<Set<String>> background(Callable<Set<String>> task) {
final ExecutorService executor = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("messages-it-%d").build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -76,7 +77,7 @@ public void setUp() throws Exception {

@Test
public void bulkIndexingShouldNotDoAnythingForEmptyList() throws Exception {
final List<String> result = messages.bulkIndex(Collections.emptyList());
final Set<String> result = messages.bulkIndex(Collections.emptyList());

assertThat(result).isNotNull()
.isEmpty();
Expand Down Expand Up @@ -140,7 +141,7 @@ public void bulkIndexRequests_allNonIndexBlockErrorsPropagatedToTheFailureSubmis
);

// when
final List<String> failureIds = messages.bulkIndexRequests(indexingRequest, false);
final Set<String> failureIds = messages.bulkIndexRequests(indexingRequest, false);

// then
assertThat(failureIds).hasSize(2)
Expand Down Expand Up @@ -180,7 +181,7 @@ public void bulkIndexRequests_nothingPropagatedToFailureSubmissionServiceWhenThe
when(messagesAdapter.bulkIndex(indexingRequest)).thenReturn(ImmutableList.of());

// when
final List<String> failureIds = messages.bulkIndexRequests(indexingRequest, false);
final Set<String> failureIds = messages.bulkIndexRequests(indexingRequest, false);

// then
assertThat(failureIds).isEmpty();
Expand Down
Loading