Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix that only aggregated metrics for filters which actually matched >0 things are reported #2050

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.model.signals.commands.exceptions;

import java.net.URI;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.JsonParsableException;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.thingsearch.model.ThingSearchException;

/**
* Thrown if during a custom aggregation metrics gathering multiple "filters" matched at the same time whereas only
* one filter is allowed to match.
*
* @since 3.6.2
*/
@Immutable
@JsonParsableException(errorCode = MultipleAggregationFilterMatchingException.ERROR_CODE)
public class MultipleAggregationFilterMatchingException extends DittoRuntimeException implements ThingSearchException {

/**
* Error code of this exception.
*/
public static final String ERROR_CODE = ERROR_CODE_PREFIX + "multiple.aggregation.filter.matching";

static final String DEFAULT_DESCRIPTION = "Ensure that only one defined 'filter' can match at the same time.";

static final HttpStatus HTTP_STATUS = HttpStatus.BAD_REQUEST;

private static final long serialVersionUID = -6341839112047194476L;

private MultipleAggregationFilterMatchingException(final DittoHeaders dittoHeaders,
@Nullable final String message,
@Nullable final String description,
@Nullable final Throwable cause,
@Nullable final URI href) {

super(ERROR_CODE, HTTP_STATUS, dittoHeaders, message, description, cause, href);
}

/**
* A mutable builder for a {@code MultipleAggregationFilterMatchingException}.
*
* @return the builder.
*/
public static Builder newBuilder() {
return new Builder();
}

/**
* Constructs a new {@code MultipleAggregationFilterMatchingException} object with the exception message extracted from the
* given JSON object.
*
* @param jsonObject the JSON to read the {@link DittoRuntimeException.JsonFields#MESSAGE} field from.
* @param dittoHeaders the headers of the command which resulted in this exception.
* @return the new MultipleAggregationFilterMatchingException.
* @throws NullPointerException if any argument is {@code null}.
* @throws org.eclipse.ditto.json.JsonMissingFieldException if this JsonObject did not contain an error message.
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonObject} was not in the expected
* format.
*/
public static MultipleAggregationFilterMatchingException fromJson(final JsonObject jsonObject,
final DittoHeaders dittoHeaders) {
return DittoRuntimeException.fromJson(jsonObject, dittoHeaders, new Builder());
}

@Override
public DittoRuntimeException setDittoHeaders(final DittoHeaders dittoHeaders) {
return new Builder()
.message(getMessage())
.description(getDescription().orElse(null))
.cause(getCause())
.href(getHref().orElse(null))
.dittoHeaders(dittoHeaders)
.build();
}

/**
* A mutable builder with a fluent API for a {@link org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.MultipleAggregationFilterMatchingException}.
*/
@NotThreadSafe
public static final class Builder extends DittoRuntimeExceptionBuilder<MultipleAggregationFilterMatchingException> {

private Builder() {
description(DEFAULT_DESCRIPTION);
}

@Override
protected MultipleAggregationFilterMatchingException doBuild(final DittoHeaders dittoHeaders,
@Nullable final String message,
@Nullable final String description,
@Nullable final Throwable cause,
@Nullable final URI href) {

return new MultipleAggregationFilterMatchingException(dittoHeaders, message, description, cause, href);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package org.eclipse.ditto.thingsearch.model.signals.commands.query;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -38,6 +41,7 @@
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.MultipleAggregationFilterMatchingException;

/**
* A response to an {@link AggregateThingsMetrics} command.
Expand Down Expand Up @@ -197,11 +201,13 @@ public Map<String, String> getGroupedBy() {
}

/**
* Returns the values for each filter defined in the metric
* Returns the values for the single matched filter defined in the metric.
*
* @return the result of the aggregation.
* @return the result of the aggregation, a single filter name with its count or an empty optional if no filter
* provided a count greater 0.
* @throws MultipleAggregationFilterMatchingException in case multiple filters matched at the same time
*/
public Map<String, Long> getResult() {
public Optional<Map.Entry<String, Long>> getResult() {
return extractFiltersResults(aggregation, filterNames);
}

Expand Down Expand Up @@ -233,19 +239,38 @@ public int hashCode() {

@Override
public String toString() {
return "AggregateThingsMetricsResponse{" +
return getClass().getSimpleName() + "[" +
"metricName='" + metricName + '\'' +
", dittoHeaders=" + dittoHeaders +
", filterNames=" + filterNames +
", aggregation=" + aggregation +
'}';
"]";
}

private Map<String, Long> extractFiltersResults(final JsonObject aggregation, final Set<String> filterNames) {
return filterNames.stream().filter(aggregation::contains).collect(Collectors.toMap(key ->
key, key -> aggregation.getValue(JsonPointer.of(key))
.orElseThrow(getJsonMissingFieldExceptionSupplier(key))
.asLong()));
private Optional<Map.Entry<String, Long>> extractFiltersResults(final JsonObject aggregation,
final Set<String> filterNames) {
final Map<String, Long> filterValues = filterNames.stream()
.filter(aggregation::contains)
.collect(
Collectors.toMap(Function.identity(),
key -> aggregation.getValue(JsonPointer.of(key))
.orElseThrow(getJsonMissingFieldExceptionSupplier(key))
.asLong()
)
);
final List<Map.Entry<String, Long>> filtersWithValueAboveZero = filterValues.entrySet()
.stream()
.filter(filterWithValue -> filterWithValue.getValue() > 0)
.collect(Collectors.toList());

if (filtersWithValueAboveZero.size() > 1) {
throw MultipleAggregationFilterMatchingException.newBuilder()
.message("Multiple filters matched: " + filtersWithValueAboveZero)
.dittoHeaders(dittoHeaders)
.build();
} else {
return filtersWithValueAboveZero.stream().findAny();
}
}

private static Supplier<RuntimeException> getJsonMissingFieldExceptionSupplier(final String field) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private <T> Source<T, NotUsed> processAggregationPersistenceResult(final Source<
final Flow<T, T, NotUsed> logAndFinishPersistenceSegmentFlow =
Flow.fromFunction(result -> {
log.withCorrelationId(dittoHeaders)
.info("aggregation element: {}", result);
.debug("aggregation element: {}", result);
return result;
});
return source.via(logAndFinishPersistenceSegmentFlow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.placeholders.PlaceholderResolver;
import org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.MultipleAggregationFilterMatchingException;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.AggregateThingsMetrics;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.AggregateThingsMetricsResponse;
import org.eclipse.ditto.thingsearch.service.common.config.CustomAggregationMetricConfig;
Expand Down Expand Up @@ -154,12 +155,24 @@ private void handleGatheringMetrics(final GatherMetricsCommand gatherMetricsComm


private void handleAggregateThingsResponse(final AggregateThingsMetricsResponse response) {
log.withCorrelationId(response).debug("Received aggregate things response: {} thread: {}",
response, Thread.currentThread().getName());
final String metricName = response.getMetricName();
// record by filter name and tags
response.getResult().forEach((filterName, value) -> {
resolveTags(filterName, customSearchMetricConfigMap.get(metricName), response);
final Optional<Map.Entry<String, Long>> result;
try {
result = response.getResult();
log.withCorrelationId(response)
.debug("Received aggregate things response for metric name <{}>: {}, " +
"extracted result: <{}> - in thread: {}",
metricName, response, result, Thread.currentThread().getName());
} catch (final MultipleAggregationFilterMatchingException e) {
log.withCorrelationId(response)
.warning("Could not gather metrics for metric name <{}> from aggregate " +
"things response: {} as multiple filters were matching at the same time: <{}>",
metricName, response, e.getMessage());
return;
}
result.ifPresent(entry -> {
final String filterName = entry.getKey();
final Long value = entry.getValue();
final CustomAggregationMetricConfig customAggregationMetricConfig =
customSearchMetricConfigMap.get(metricName);
final TagSet tagSet = resolveTags(filterName, customAggregationMetricConfig, response);
Expand Down
Loading