diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java b/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java index 97ffb1ba353f..fe15f51c45a4 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java @@ -43,7 +43,6 @@ import javax.inject.Singleton; import java.io.IOException; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -116,21 +115,21 @@ public List analyze(String toAnalyze, String index, String analyzer) thr return messagesAdapter.analyze(toAnalyze, index, analyzer); } - public List bulkIndex(final List> messageList) { + public Set bulkIndex(final List> messageList) { return bulkIndex(messageList, false, null); } - public List bulkIndex(final List> messageList, IndexingListener indexingListener) { + public Set bulkIndex(final List> messageList, IndexingListener indexingListener) { return bulkIndex(messageList, false, indexingListener); } - public List bulkIndex(final List> messageList, boolean isSystemTraffic) { + public Set bulkIndex(final List> messageList, boolean isSystemTraffic) { return bulkIndex(messageList, isSystemTraffic, null); } - public List bulkIndex(final List> messageList, boolean isSystemTraffic, IndexingListener indexingListener) { + public Set bulkIndex(final List> messageList, boolean isSystemTraffic, IndexingListener indexingListener) { if (messageList.isEmpty()) { - return Collections.emptyList(); + return Set.of(); } final List indexingRequestList = messageList.stream() @@ -140,11 +139,11 @@ public List bulkIndex(final List> messageLi return bulkIndexRequests(indexingRequestList, isSystemTraffic, indexingListener); } - public List bulkIndexRequests(List indexingRequestList, boolean isSystemTraffic) { + public Set bulkIndexRequests(List indexingRequestList, boolean isSystemTraffic) { return bulkIndexRequests(indexingRequestList, isSystemTraffic, null); } - public List bulkIndexRequests(List indexingRequestList, boolean isSystemTraffic, IndexingListener indexingListener) { + public Set bulkIndexRequests(List indexingRequestList, boolean isSystemTraffic, IndexingListener indexingListener) { final List indexingErrors = runBulkRequest(indexingRequestList, indexingRequestList.size(), indexingListener); final Set remainingErrors = retryOnlyIndexBlockItemsForever(indexingRequestList, indexingErrors, indexingListener); @@ -268,16 +267,16 @@ private void recordTimestamp(List messageList) { } } - private List propagateFailure(Collection indexingErrors) { + private Set propagateFailure(Collection indexingErrors) { if (indexingErrors.isEmpty()) { - return Collections.emptyList(); + return Set.of(); } failureSubmissionService.submitIndexingErrors(indexingErrors); return indexingErrors.stream() .map(IndexingError::message).map(Indexable::getId) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); } @AutoValue diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java index 01a0735876b7..7ae98b5265fa 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -45,6 +46,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import static com.codahale.metrics.MetricRegistry.name; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -130,11 +132,10 @@ private void flush(List> messages) { activeFlushThreads.get()); } - try (Timer.Context ignored = processTime.time()) { - lastFlushTime.set(System.nanoTime()); - writeMessageEntries(messages); - batchSize.update(messages.size()); - bufferFlushes.mark(); + try { + indexMessageBatch(messages); + // This does not exclude failedMessageIds, because we don't know if ES is ever gonna accept these messages. + acknowledger.acknowledge(messages.stream().map(Map.Entry::getValue).collect(Collectors.toList())); } catch (Exception e) { log.error("Unable to flush message buffer", e); bufferFlushFailures.mark(); @@ -143,6 +144,16 @@ private void flush(List> messages) { log.debug("Flushing {} messages completed", messages.size()); } + protected Set indexMessageBatch(List> messages) throws Exception { + try (Timer.Context ignored = processTime.time()) { + lastFlushTime.set(System.nanoTime()); + final Set failedMessageIds = writeMessageEntries(messages); + batchSize.update(messages.size()); + bufferFlushes.mark(); + return failedMessageIds; + } + } + public void forceFlushIfTimedout() { // if we shouldn't flush at all based on the last flush time, no need to synchronize on this. if (lastFlushTime.get() != 0 && diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java index 31464006dba2..a82a975ed31d 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java @@ -36,6 +36,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -55,7 +56,7 @@ public class ElasticSearchOutput implements MessageOutput { private final Messages messages; private final Journal journal; private final AtomicBoolean isRunning = new AtomicBoolean(false); - private final MessageQueueAcknowledger acknowledger; + protected final MessageQueueAcknowledger acknowledger; @Inject public ElasticSearchOutput(MetricRegistry metricRegistry, @@ -87,7 +88,7 @@ public void write(List messageList) throws Exception { throw new UnsupportedOperationException("Method not supported!"); } - public void writeMessageEntries(List> messageList) throws Exception { + public Set writeMessageEntries(List> messageList) throws Exception { if (LOG.isTraceEnabled()) { final String sortedIds = messageList.stream() .map(Map.Entry::getValue) @@ -98,14 +99,13 @@ public void writeMessageEntries(List> messageList) } writes.mark(messageList.size()); - final List failedMessageIds; + final Set failedMessageIds; try (final Timer.Context ignored = processTime.time()) { failedMessageIds = messages.bulkIndex(messageList); } failures.mark(failedMessageIds.size()); - // This does not exclude failedMessageIds, because we don't know if ES is ever gonna accept these messages. - acknowledger.acknowledge(messageList.stream().map(Map.Entry::getValue).collect(Collectors.toList())); + return failedMessageIds; } @Override diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesBatchIT.java b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesBatchIT.java index fbd08238dd23..260cdd1b9e99 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesBatchIT.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesBatchIT.java @@ -34,6 +34,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -72,7 +73,7 @@ public void testIfLargeBatchesGetSplitUpOnCircuitBreakerExceptions() throws Exce final int MESSAGECOUNT = 50; // Each Message is about 1 MB final List> largeMessageBatch = createMessageBatch(1024 * 1024, MESSAGECOUNT); - final List failedItems = this.messages.bulkIndex(largeMessageBatch); + final Set failedItems = this.messages.bulkIndex(largeMessageBatch); client().refreshNode(); // wait for ES to finish indexing diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesBulkIndexRetryingTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesBulkIndexRetryingTest.java index a692375c6eb4..8fa3fe5bcc9a 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesBulkIndexRetryingTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesBulkIndexRetryingTest.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -60,7 +61,7 @@ void setUp() { @Test public void bulkIndexingShouldNotDoAnythingForEmptyList() throws Exception { - final List result = messages.bulkIndex(Collections.emptyList()); + final Set result = messages.bulkIndex(Collections.emptyList()); assertThat(result).isNotNull() .isEmpty(); @@ -86,7 +87,7 @@ public void bulkIndexingShouldNotRetryForIndexMappingErrors() throws Exception { final List> messageList = messageListWith(mockedMessage); - final List result = messages.bulkIndex(messageList); + final Set result = messages.bulkIndex(messageList); assertThat(result).hasSize(1); @@ -101,7 +102,7 @@ public void bulkIndexingShouldRetry() throws Exception { final List> messageList = messageListWith(mock(Message.class)); - final List result = messages.bulkIndex(messageList); + final Set result = messages.bulkIndex(messageList); assertThat(result).isNotNull().isEmpty(); @@ -119,7 +120,7 @@ public void bulkIndexingShouldRetryIfIndexBlocked() throws IOException { .thenReturn(errorResult) .thenReturn(successResult); - final List result = messages.bulkIndex(messagesWithIds("blocked-id")); + final Set result = messages.bulkIndex(messagesWithIds("blocked-id")); verify(messagesAdapter, times(2)).bulkIndex(any()); assertThat(result).isNotNull().isEmpty(); @@ -137,7 +138,7 @@ public void indexBlockedRetriesShouldOnlyRetryIndexBlockedErrors() throws IOExce .thenReturn(errorResult) .thenReturn(successResult); - final List result = messages.bulkIndex(messagesWithIds("blocked-id", "other-error-id")); + final Set result = messages.bulkIndex(messagesWithIds("blocked-id", "other-error-id")); verify(messagesAdapter, times(2)).bulkIndex(any()); assertThat(result).containsOnly("other-error-id"); @@ -157,7 +158,7 @@ public void retriedIndexBlockErrorsThatFailWithDifferentErrorsAreTreatedAsPersis .thenReturn(errorResult) .thenReturn(secondErrorResult); - final List result = messages.bulkIndex(messagesWithIds("blocked-id", "other-error-id")); + final Set result = messages.bulkIndex(messagesWithIds("blocked-id", "other-error-id")); verify(messagesAdapter, times(2)).bulkIndex(any()); assertThat(result).containsOnly("other-error-id"); diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesIT.java b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesIT.java index 9723e20d8307..e243971f4be2 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesIT.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesIT.java @@ -40,6 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -119,7 +120,7 @@ public void testIfTooLargeBatchesGetSplitUp() throws Exception { final int MESSAGECOUNT = 101; // Each Message is about 1 MB final List> largeMessageBatch = createMessageBatch(1024 * 1024, MESSAGECOUNT); - final List failedItems = this.messages.bulkIndex(largeMessageBatch); + final Set failedItems = this.messages.bulkIndex(largeMessageBatch); assertThat(failedItems).isEmpty(); @@ -139,7 +140,7 @@ public void unevenTooLargeBatchesGetSplitUp() throws Exception { final int LARGE_MESSAGECOUNT = 20; final List> messageBatch = createMessageBatch(1024, MESSAGECOUNT); messageBatch.addAll(createMessageBatch(1024 * 1024 * 5, LARGE_MESSAGECOUNT)); - final List failedItems = this.messages.bulkIndex(messageBatch); + final Set failedItems = this.messages.bulkIndex(messageBatch); assertThat(failedItems).isEmpty(); @@ -161,7 +162,7 @@ public void conflictingFieldTypesErrorAreReported() throws Exception { entry(indexSet, message2) ); - final List failedItems = this.messages.bulkIndex(messageBatch); + final Set failedItems = this.messages.bulkIndex(messageBatch); assertThat(failedItems).hasSize(1); @@ -175,13 +176,13 @@ public void retryIndexingMessagesDuringFloodStage() throws Exception { final AtomicBoolean succeeded = new AtomicBoolean(false); final List> messageBatch = createMessageBatch(1024, 50); - final Future> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded))); + final Future> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded))); countDownLatch.await(); resetFloodStage(INDEX_NAME); - final List failedItems = result.get(3, TimeUnit.MINUTES); + final Set failedItems = result.get(3, TimeUnit.MINUTES); assertThat(failedItems).isEmpty(); client().refreshNode(); @@ -220,13 +221,13 @@ public void retryIndexingMessagesIfTargetAliasIsInvalid() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); final AtomicBoolean succeeded = new AtomicBoolean(false); - final Future> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded))); + final Future> result = background(() -> this.messages.bulkIndex(messageBatch, createIndexingListener(countDownLatch, succeeded))); countDownLatch.await(); client().removeAliasMapping(index2, INDEX_NAME); - final List failedItems = result.get(3, TimeUnit.MINUTES); + final Set failedItems = result.get(3, TimeUnit.MINUTES); assertThat(failedItems).isEmpty(); client().refreshNode(); @@ -243,7 +244,7 @@ public void properlySerializesCustomObjectsInMessageField() throws IOException { Maps.immutableEntry(indexSet, message) ); - final List failedItems = this.messages.bulkIndex(messageBatch); + final Set failedItems = this.messages.bulkIndex(messageBatch); assertThat(failedItems).isEmpty(); @@ -254,7 +255,7 @@ public void properlySerializesCustomObjectsInMessageField() throws IOException { assertThat(resultMessage.getMessage().getField("custom_object")).isEqualTo("foo"); } - private Future> background(Callable> task) { + private Future> background(Callable> task) { final ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("messages-it-%d").build()); diff --git a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesTest.java b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesTest.java index 53a9a4fa49c1..691d29237b16 100644 --- a/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesTest.java +++ b/graylog2-server/src/test/java/org/graylog2/indexer/messages/MessagesTest.java @@ -39,6 +39,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -76,7 +77,7 @@ public void setUp() throws Exception { @Test public void bulkIndexingShouldNotDoAnythingForEmptyList() throws Exception { - final List result = messages.bulkIndex(Collections.emptyList()); + final Set result = messages.bulkIndex(Collections.emptyList()); assertThat(result).isNotNull() .isEmpty(); @@ -140,7 +141,7 @@ public void bulkIndexRequests_allNonIndexBlockErrorsPropagatedToTheFailureSubmis ); // when - final List failureIds = messages.bulkIndexRequests(indexingRequest, false); + final Set failureIds = messages.bulkIndexRequests(indexingRequest, false); // then assertThat(failureIds).hasSize(2) @@ -180,7 +181,7 @@ public void bulkIndexRequests_nothingPropagatedToFailureSubmissionServiceWhenThe when(messagesAdapter.bulkIndex(indexingRequest)).thenReturn(ImmutableList.of()); // when - final List failureIds = messages.bulkIndexRequests(indexingRequest, false); + final Set failureIds = messages.bulkIndexRequests(indexingRequest, false); // then assertThat(failureIds).isEmpty();