diff --git a/changelog/@unreleased/pr-2023.v2.yml b/changelog/@unreleased/pr-2023.v2.yml new file mode 100644 index 000000000..2bf5988c3 --- /dev/null +++ b/changelog/@unreleased/pr-2023.v2.yml @@ -0,0 +1,6 @@ +type: feature +feature: + description: Add a request size metric channel, which records the size of payloads + written by the client. + links: + - https://github.com/palantir/dialogue/pull/2023 diff --git a/dialogue-clients/metrics.md b/dialogue-clients/metrics.md index 98b3ab547..d25dce91f 100644 --- a/dialogue-clients/metrics.md +++ b/dialogue-clients/metrics.md @@ -71,6 +71,11 @@ Dialogue-specific metrics that are not necessarily applicable to other client im - `dialogue.client.request.queued.time` tagged `channel-name` (timer): Time spent waiting in the queue before execution. - `dialogue.client.request.endpoint.queued.time` tagged `channel-name`, `service-name`, `endpoint` (timer): Time spent waiting in the queue before execution on a specific endpoint due to server QoS. - `dialogue.client.request.sticky.queued.time` tagged `channel-name` (timer): Time spent waiting in the sticky queue before execution attempt. +- `dialogue.client.requests.size` (histogram): Histogram of the sizes of requests larger than a threshold (1 MiB). + - `repeatable` values (`true`,`false`) + - `channel-name` + - `service-name` + - `endpoint` - `dialogue.client.create` tagged `client-name`, `client-type` (meter): Marked every time a new client is created. ### dialogue.concurrencylimiter diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java index fe06676d2..e39d2a8dd 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java @@ -92,6 +92,7 @@ public Builder clientConfiguration(ClientConfiguration value) { /** * Please use {@link #factory(DialogueChannelFactory)}. + * * @deprecated prefer {@link #factory(DialogueChannelFactory)} */ @Deprecated @@ -185,6 +186,7 @@ public DialogueChannel build() { channel = new RangeAcceptsIdentityEncodingChannel(channel); channel = ContentEncodingChannel.of(channel, endpoint); channel = TracedChannel.create(cf, channel, endpoint); + channel = RequestSizeMetricsChannel.create(cf, channel, endpoint); if (ChannelToEndpointChannel.isConstant(endpoint)) { // Avoid producing metrics for non-constant endpoints which may produce // high cardinality. @@ -207,7 +209,9 @@ public DialogueChannel build() { return new DialogueChannel(cf, channelFactory, stickyChannelSupplier); } - /** Does *not* do any clever live-reloading. */ + /** + * Does *not* do any clever live-reloading. + */ @CheckReturnValue public Channel buildNonLiveReloading() { return build(); diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/RequestSizeMetricsChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/RequestSizeMetricsChannel.java new file mode 100644 index 000000000..3742a70f9 --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/RequestSizeMetricsChannel.java @@ -0,0 +1,131 @@ +/* + * (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.codahale.metrics.Histogram; +import com.google.common.base.Suppliers; +import com.google.common.io.CountingOutputStream; +import com.google.common.util.concurrent.ListenableFuture; +import com.palantir.conjure.java.client.config.ClientConfiguration; +import com.palantir.dialogue.Endpoint; +import com.palantir.dialogue.EndpointChannel; +import com.palantir.dialogue.Request; +import com.palantir.dialogue.RequestBody; +import com.palantir.dialogue.Response; +import com.palantir.dialogue.core.DialogueClientMetrics.RequestsSize_Repeatable; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; +import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.function.Supplier; + +final class RequestSizeMetricsChannel implements EndpointChannel { + private static final SafeLogger log = SafeLoggerFactory.get(RequestSizeMetricsChannel.class); + // MIN_REPORTED_REQUEST_SIZE filters recording small requests to reduce metric cardinality + private static final long MIN_REPORTED_REQUEST_SIZE = 1 << 20; + private final EndpointChannel delegate; + private final Supplier repeatableRequestSize; + private final Supplier unrepeatableRequestSize; + + static EndpointChannel create(Config cf, EndpointChannel channel, Endpoint endpoint) { + ClientConfiguration clientConf = cf.clientConf(); + return new RequestSizeMetricsChannel(channel, cf.channelName(), endpoint, clientConf.taggedMetricRegistry()); + } + + RequestSizeMetricsChannel( + EndpointChannel delegate, String channelName, Endpoint endpoint, TaggedMetricRegistry registry) { + this.delegate = delegate; + DialogueClientMetrics dialogueClientMetrics = DialogueClientMetrics.of(registry); + this.repeatableRequestSize = Suppliers.memoize(() -> dialogueClientMetrics + .requestsSize() + .repeatable(RequestsSize_Repeatable.TRUE) + .channelName(channelName) + .serviceName(endpoint.serviceName()) + .endpoint(endpoint.endpointName()) + .build()); + this.unrepeatableRequestSize = Suppliers.memoize(() -> dialogueClientMetrics + .requestsSize() + .repeatable(RequestsSize_Repeatable.FALSE) + .channelName(channelName) + .serviceName(endpoint.serviceName()) + .endpoint(endpoint.endpointName()) + .build()); + } + + @Override + public ListenableFuture execute(Request request) { + Request augmentedRequest = wrap(request); + return delegate.execute(augmentedRequest); + } + + private Request wrap(Request request) { + Optional body = request.body(); + if (body.isEmpty()) { + // No need to record empty bodies + return request; + } + Supplier requestSizeHistogram = + body.get().repeatable() ? this.repeatableRequestSize : this.unrepeatableRequestSize; + + return Request.builder() + .from(request) + .body(new RequestSizeRecordingRequestBody(body.get(), requestSizeHistogram)) + .build(); + } + + private static class RequestSizeRecordingRequestBody implements RequestBody { + private final RequestBody delegate; + private final Supplier size; + + RequestSizeRecordingRequestBody(RequestBody requestBody, Supplier size) { + this.delegate = requestBody; + this.size = size; + } + + @Override + public void writeTo(OutputStream output) throws IOException { + CountingOutputStream countingOut = new CountingOutputStream(output); + delegate.writeTo(countingOut); + if (countingOut.getCount() > MIN_REPORTED_REQUEST_SIZE) { + size.get().update(countingOut.getCount()); + } + } + + @Override + public String contentType() { + return delegate.contentType(); + } + + @Override + public boolean repeatable() { + return delegate.repeatable(); + } + + @Override + public OptionalLong contentLength() { + return delegate.contentLength(); + } + + @Override + public void close() { + delegate.close(); + } + } +} diff --git a/dialogue-core/src/main/metrics/dialogue-core-metrics.yml b/dialogue-core/src/main/metrics/dialogue-core-metrics.yml index 40cebfb6f..1e1698fbf 100644 --- a/dialogue-core/src/main/metrics/dialogue-core-metrics.yml +++ b/dialogue-core/src/main/metrics/dialogue-core-metrics.yml @@ -56,6 +56,15 @@ namespaces: type: timer tags: [ channel-name ] docs: Time spent waiting in the sticky queue before execution attempt. + requests.size: + type: histogram + tags: + - name: repeatable + values: [ 'true', 'false' ] + - name: channel-name + - name: service-name + - name: endpoint + docs: Histogram of the sizes of requests larger than a threshold (1 MiB). # Note: the 'dialogue.client.create' metric is also defined in the apache metrics. create: type: meter diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/RequestSizeMetricsChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/RequestSizeMetricsChannelTest.java new file mode 100644 index 000000000..4559f56c6 --- /dev/null +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/RequestSizeMetricsChannelTest.java @@ -0,0 +1,178 @@ +/* + * (c) Copyright 2023 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.codahale.metrics.Snapshot; +import com.google.common.io.ByteStreams; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.palantir.conjure.java.client.config.ClientConfiguration; +import com.palantir.dialogue.EndpointChannel; +import com.palantir.dialogue.Request; +import com.palantir.dialogue.RequestBody; +import com.palantir.dialogue.Response; +import com.palantir.dialogue.TestConfigurations; +import com.palantir.dialogue.TestEndpoint; +import com.palantir.dialogue.TestResponse; +import com.palantir.dialogue.core.DialogueClientMetrics.RequestsSize_Repeatable; +import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; +import com.palantir.tritium.metrics.registry.MetricName; +import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.OptionalLong; +import java.util.concurrent.ExecutionException; +import org.junit.jupiter.api.Test; + +public class RequestSizeMetricsChannelTest { + + private static final DialogueChannelFactory STUB_FACTORY = _ignored -> { + throw new AssertionError("DialogueChannelFactory should not be used"); + }; + + @Test + public void records_request_size_metrics() throws ExecutionException, InterruptedException { + TaggedMetricRegistry registry = new DefaultTaggedMetricRegistry(); + + int recordableRequestSize = 2 << 20; + byte[] expected = "a".repeat(recordableRequestSize).getBytes(StandardCharsets.UTF_8); + Request request = Request.builder() + .body(new RequestBody() { + @Override + public void writeTo(OutputStream output) throws IOException { + output.write(expected); + } + + @Override + public String contentType() { + return "text/plain"; + } + + @Override + public boolean repeatable() { + return true; + } + + @Override + public OptionalLong contentLength() { + return OptionalLong.of(expected.length); + } + + @Override + public void close() {} + }) + .build(); + + EndpointChannel channel = RequestSizeMetricsChannel.create( + ImmutableConfig.builder() + .channelName("channelName") + .channelFactory(STUB_FACTORY) + .rawConfig(ClientConfiguration.builder() + .from(TestConfigurations.create("https://foo:10001")) + .taggedMetricRegistry(registry) + .build()) + .build(), + r -> { + try (RequestBody body = r.body().get()) { + body.writeTo(ByteStreams.nullOutputStream()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return Futures.immediateFuture(new TestResponse().code(200)); + }, + TestEndpoint.GET); + ListenableFuture response = channel.execute(request); + + assertThat(response.get().code()).isEqualTo(200); + Snapshot snapshot = DialogueClientMetrics.of(registry) + .requestsSize() + .repeatable(RequestsSize_Repeatable.TRUE) + .channelName("channelName") + .serviceName("service") + .endpoint("endpoint") + .build() + .getSnapshot(); + assertThat(snapshot.size()).isEqualTo(1); + assertThat(snapshot.get99thPercentile()).isEqualTo(expected.length); + } + + @Test + public void small_request_not_recorded() throws ExecutionException, InterruptedException { + TaggedMetricRegistry registry = new DefaultTaggedMetricRegistry(); + + byte[] expected = "test request body".getBytes(StandardCharsets.UTF_8); + Request request = Request.builder() + .body(new RequestBody() { + @Override + public void writeTo(OutputStream output) throws IOException { + output.write(expected); + } + + @Override + public String contentType() { + return "text/plain"; + } + + @Override + public boolean repeatable() { + return true; + } + + @Override + public OptionalLong contentLength() { + return OptionalLong.of(expected.length); + } + + @Override + public void close() {} + }) + .build(); + + EndpointChannel channel = RequestSizeMetricsChannel.create( + ImmutableConfig.builder() + .channelName("smallRequestChannelName") + .channelFactory(STUB_FACTORY) + .rawConfig(ClientConfiguration.builder() + .from(TestConfigurations.create("https://foo:10001")) + .taggedMetricRegistry(registry) + .build()) + .build(), + r -> { + try (RequestBody body = r.body().get()) { + body.writeTo(ByteStreams.nullOutputStream()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return Futures.immediateFuture(new TestResponse().code(200)); + }, + TestEndpoint.GET); + ListenableFuture response = channel.execute(request); + + assertThat(response.get().code()).isEqualTo(200); + MetricName metricName = DialogueClientMetrics.of(registry) + .requestsSize() + .repeatable(RequestsSize_Repeatable.TRUE) + .channelName("smallRequestChannelName") + .serviceName("service") + .endpoint("endpoint") + .buildMetricName(); + assertThat(registry.remove(metricName)).isEmpty(); + } +}