diff --git a/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/exceptions/MultipleAggregationFilterMatchingException.java b/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/exceptions/MultipleAggregationFilterMatchingException.java new file mode 100755 index 0000000000..5f1ebe660d --- /dev/null +++ b/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/exceptions/MultipleAggregationFilterMatchingException.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.thingsearch.model.signals.commands.exceptions; + +import java.net.URI; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.NotThreadSafe; + +import org.eclipse.ditto.base.model.common.HttpStatus; +import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; +import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder; +import org.eclipse.ditto.base.model.headers.DittoHeaders; +import org.eclipse.ditto.base.model.json.JsonParsableException; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.thingsearch.model.ThingSearchException; + +/** + * Thrown if during a custom aggregation metrics gathering multiple "filters" matched at the same time whereas only + * one filter is allowed to match. + * + * @since 3.6.2 + */ +@Immutable +@JsonParsableException(errorCode = MultipleAggregationFilterMatchingException.ERROR_CODE) +public class MultipleAggregationFilterMatchingException extends DittoRuntimeException implements ThingSearchException { + + /** + * Error code of this exception. + */ + public static final String ERROR_CODE = ERROR_CODE_PREFIX + "multiple.aggregation.filter.matching"; + + static final String DEFAULT_DESCRIPTION = "Ensure that only one defined 'filter' can match at the same time."; + + static final HttpStatus HTTP_STATUS = HttpStatus.BAD_REQUEST; + + private static final long serialVersionUID = -6341839112047194476L; + + private MultipleAggregationFilterMatchingException(final DittoHeaders dittoHeaders, + @Nullable final String message, + @Nullable final String description, + @Nullable final Throwable cause, + @Nullable final URI href) { + + super(ERROR_CODE, HTTP_STATUS, dittoHeaders, message, description, cause, href); + } + + /** + * A mutable builder for a {@code MultipleAggregationFilterMatchingException}. + * + * @return the builder. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Constructs a new {@code MultipleAggregationFilterMatchingException} object with the exception message extracted from the + * given JSON object. + * + * @param jsonObject the JSON to read the {@link DittoRuntimeException.JsonFields#MESSAGE} field from. + * @param dittoHeaders the headers of the command which resulted in this exception. + * @return the new MultipleAggregationFilterMatchingException. + * @throws NullPointerException if any argument is {@code null}. + * @throws org.eclipse.ditto.json.JsonMissingFieldException if this JsonObject did not contain an error message. + * @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonObject} was not in the expected + * format. + */ + public static MultipleAggregationFilterMatchingException fromJson(final JsonObject jsonObject, + final DittoHeaders dittoHeaders) { + return DittoRuntimeException.fromJson(jsonObject, dittoHeaders, new Builder()); + } + + @Override + public DittoRuntimeException setDittoHeaders(final DittoHeaders dittoHeaders) { + return new Builder() + .message(getMessage()) + .description(getDescription().orElse(null)) + .cause(getCause()) + .href(getHref().orElse(null)) + .dittoHeaders(dittoHeaders) + .build(); + } + + /** + * A mutable builder with a fluent API for a {@link org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.MultipleAggregationFilterMatchingException}. + */ + @NotThreadSafe + public static final class Builder extends DittoRuntimeExceptionBuilder { + + private Builder() { + description(DEFAULT_DESCRIPTION); + } + + @Override + protected MultipleAggregationFilterMatchingException doBuild(final DittoHeaders dittoHeaders, + @Nullable final String message, + @Nullable final String description, + @Nullable final Throwable cause, + @Nullable final URI href) { + + return new MultipleAggregationFilterMatchingException(dittoHeaders, message, description, cause, href); + } + + } + +} diff --git a/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/query/AggregateThingsMetricsResponse.java b/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/query/AggregateThingsMetricsResponse.java index a5c8987f48..ef1f64fa21 100644 --- a/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/query/AggregateThingsMetricsResponse.java +++ b/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/query/AggregateThingsMetricsResponse.java @@ -15,9 +15,12 @@ package org.eclipse.ditto.thingsearch.model.signals.commands.query; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -38,6 +41,7 @@ import org.eclipse.ditto.json.JsonObjectBuilder; import org.eclipse.ditto.json.JsonPointer; import org.eclipse.ditto.json.JsonValue; +import org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.MultipleAggregationFilterMatchingException; /** * A response to an {@link AggregateThingsMetrics} command. @@ -197,11 +201,13 @@ public Map getGroupedBy() { } /** - * Returns the values for each filter defined in the metric + * Returns the values for the single matched filter defined in the metric. * - * @return the result of the aggregation. + * @return the result of the aggregation, a single filter name with its count or an empty optional if no filter + * provided a count greater 0. + * @throws MultipleAggregationFilterMatchingException in case multiple filters matched at the same time */ - public Map getResult() { + public Optional> getResult() { return extractFiltersResults(aggregation, filterNames); } @@ -233,19 +239,38 @@ public int hashCode() { @Override public String toString() { - return "AggregateThingsMetricsResponse{" + + return getClass().getSimpleName() + "[" + "metricName='" + metricName + '\'' + ", dittoHeaders=" + dittoHeaders + ", filterNames=" + filterNames + ", aggregation=" + aggregation + - '}'; + "]"; } - private Map extractFiltersResults(final JsonObject aggregation, final Set filterNames) { - return filterNames.stream().filter(aggregation::contains).collect(Collectors.toMap(key -> - key, key -> aggregation.getValue(JsonPointer.of(key)) - .orElseThrow(getJsonMissingFieldExceptionSupplier(key)) - .asLong())); + private Optional> extractFiltersResults(final JsonObject aggregation, + final Set filterNames) { + final Map filterValues = filterNames.stream() + .filter(aggregation::contains) + .collect( + Collectors.toMap(Function.identity(), + key -> aggregation.getValue(JsonPointer.of(key)) + .orElseThrow(getJsonMissingFieldExceptionSupplier(key)) + .asLong() + ) + ); + final List> filtersWithValueAboveZero = filterValues.entrySet() + .stream() + .filter(filterWithValue -> filterWithValue.getValue() > 0) + .collect(Collectors.toList()); + + if (filtersWithValueAboveZero.size() > 1) { + throw MultipleAggregationFilterMatchingException.newBuilder() + .message("Multiple filters matched: " + filtersWithValueAboveZero) + .dittoHeaders(dittoHeaders) + .build(); + } else { + return filtersWithValueAboveZero.stream().findAny(); + } } private static Supplier getJsonMissingFieldExceptionSupplier(final String field) { diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/AggregateThingsMetricsActor.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/AggregateThingsMetricsActor.java index 07f6872657..fb499eb595 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/AggregateThingsMetricsActor.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/AggregateThingsMetricsActor.java @@ -110,7 +110,7 @@ private Source processAggregationPersistenceResult(final Source< final Flow logAndFinishPersistenceSegmentFlow = Flow.fromFunction(result -> { log.withCorrelationId(dittoHeaders) - .info("aggregation element: {}", result); + .debug("aggregation element: {}", result); return result; }); return source.via(logAndFinishPersistenceSegmentFlow); diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorAggregateMetricsProviderActor.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorAggregateMetricsProviderActor.java index 375a891c6a..f44ce2ff31 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorAggregateMetricsProviderActor.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorAggregateMetricsProviderActor.java @@ -47,6 +47,7 @@ import org.eclipse.ditto.placeholders.ExpressionResolver; import org.eclipse.ditto.placeholders.PlaceholderFactory; import org.eclipse.ditto.placeholders.PlaceholderResolver; +import org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.MultipleAggregationFilterMatchingException; import org.eclipse.ditto.thingsearch.model.signals.commands.query.AggregateThingsMetrics; import org.eclipse.ditto.thingsearch.model.signals.commands.query.AggregateThingsMetricsResponse; import org.eclipse.ditto.thingsearch.service.common.config.CustomAggregationMetricConfig; @@ -154,12 +155,24 @@ private void handleGatheringMetrics(final GatherMetricsCommand gatherMetricsComm private void handleAggregateThingsResponse(final AggregateThingsMetricsResponse response) { - log.withCorrelationId(response).debug("Received aggregate things response: {} thread: {}", - response, Thread.currentThread().getName()); final String metricName = response.getMetricName(); - // record by filter name and tags - response.getResult().forEach((filterName, value) -> { - resolveTags(filterName, customSearchMetricConfigMap.get(metricName), response); + final Optional> result; + try { + result = response.getResult(); + log.withCorrelationId(response) + .debug("Received aggregate things response for metric name <{}>: {}, " + + "extracted result: <{}> - in thread: {}", + metricName, response, result, Thread.currentThread().getName()); + } catch (final MultipleAggregationFilterMatchingException e) { + log.withCorrelationId(response) + .warning("Could not gather metrics for metric name <{}> from aggregate " + + "things response: {} as multiple filters were matching at the same time: <{}>", + metricName, response, e.getMessage()); + return; + } + result.ifPresent(entry -> { + final String filterName = entry.getKey(); + final Long value = entry.getValue(); final CustomAggregationMetricConfig customAggregationMetricConfig = customSearchMetricConfigMap.get(metricName); final TagSet tagSet = resolveTags(filterName, customAggregationMetricConfig, response);