Skip to content

Commit

Permalink
Add request size metrics channel
Browse files Browse the repository at this point in the history
  • Loading branch information
andybradshaw committed Sep 26, 2023
1 parent 6a10243 commit 6dc9052
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 0 deletions.
1 change: 1 addition & 0 deletions dialogue-clients/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Dialogue-specific metrics that are not necessarily applicable to other client im
- `dialogue.client.request.queued.time` tagged `channel-name` (timer): Time spent waiting in the queue before execution.
- `dialogue.client.request.endpoint.queued.time` tagged `channel-name`, `service-name`, `endpoint` (timer): Time spent waiting in the queue before execution on a specific endpoint due to server QoS.
- `dialogue.client.request.sticky.queued.time` tagged `channel-name` (timer): Time spent waiting in the sticky queue before execution attempt.
- `dialogue.client.requests.size` tagged `service-name`, `endpoint` (histogram): Size of requests
- `dialogue.client.create` tagged `client-name`, `client-type` (meter): Marked every time a new client is created.

### dialogue.concurrencylimiter
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.dialogue.core;

import com.codahale.metrics.Histogram;
import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.conjure.java.client.config.ClientConfiguration;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.EndpointChannel;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.RequestBody;
import com.palantir.dialogue.Response;
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Optional;

final class RequestSizeMetricsChannel implements EndpointChannel {
private final EndpointChannel delegate;
private final Histogram requestSize;

static EndpointChannel create(Config cf, EndpointChannel channel, Endpoint endpoint) {
ClientConfiguration clientConf = cf.clientConf();
return new RequestSizeMetricsChannel(channel, endpoint, clientConf.taggedMetricRegistry());
}

RequestSizeMetricsChannel(EndpointChannel delegate, Endpoint endpoint, TaggedMetricRegistry registry) {
this.delegate = delegate;
DialogueClientMetrics dialogueClientMetrics = DialogueClientMetrics.of(registry);
this.requestSize = dialogueClientMetrics
.requestsSize()
.serviceName(endpoint.serviceName())
.endpoint(endpoint.endpointName())
.build();
}

@Override
public ListenableFuture<Response> execute(Request request) {
Request augmentedRequest = wrap(request);
return delegate.execute(augmentedRequest);
}

private Request wrap(Request request) {
Optional<RequestBody> body = request.body();
if (body.isEmpty()) {
// No need to record empty bodies
return request;
}

return Request.builder()
.from(request)
.body(new RequestSizeRecordingRequestBody(body.get(), this.requestSize))
.build();
}

private class RequestSizeRecordingRequestBody implements RequestBody {
private final RequestBody delegate;
private final Histogram size;
private SizeTrackingOutputStream out;

RequestSizeRecordingRequestBody(RequestBody requestBody, Histogram size) {
this.delegate = requestBody;
this.size = size;
// we'll never actually write to this output stream, but is safe to perform all operations on in case a
// client closes without calling write.
this.out = new SizeTrackingOutputStream(OutputStream.nullOutputStream(), size);
}

@Override
public void writeTo(OutputStream output) throws IOException {
out = new SizeTrackingOutputStream(output, size);
delegate.writeTo(out);
}

@Override
public String contentType() {
return delegate.contentType();
}

@Override
public boolean repeatable() {
return delegate.repeatable();
}

@Override
public void close() {
try {
out.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
delegate.close();
}

/**
* {@link SizeTrackingOutputStream} records the total number of bytes written to the output stream.
*/
private final class SizeTrackingOutputStream extends FilterOutputStream {
private final Histogram size;
private long writes = 0;

SizeTrackingOutputStream(OutputStream delegate, Histogram size) {
super(delegate);
this.size = size;
}

@Override
public void write(byte[] buffer, int off, int len) throws IOException {
writes += len;
out.write(buffer, off, len);
}

@Override
public void write(int value) throws IOException {
writes += 1;
out.write(value);
}

@Override
public void close() throws IOException {
this.size.update(writes);
super.close();
}
}
}
}
4 changes: 4 additions & 0 deletions dialogue-core/src/main/metrics/dialogue-core-metrics.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ namespaces:
type: timer
tags: [ channel-name ]
docs: Time spent waiting in the sticky queue before execution attempt.
requests.size:
type: histogram
tags: [service-name, endpoint]
docs: Size of requests
# Note: the 'dialogue.client.create' metric is also defined in the apache metrics.
create:
type: meter
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* (c) Copyright 2023 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.dialogue.core;

import static org.assertj.core.api.Assertions.assertThat;

import com.codahale.metrics.Snapshot;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.conjure.java.client.config.ClientConfiguration;
import com.palantir.dialogue.EndpointChannel;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.RequestBody;
import com.palantir.dialogue.Response;
import com.palantir.dialogue.TestConfigurations;
import com.palantir.dialogue.TestEndpoint;
import com.palantir.dialogue.TestResponse;
import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry;
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class RequestSizeMetricsChannelTest {
@Mock
DialogueChannelFactory factory;

@Test
public void records_request_size_metrics() throws ExecutionException, InterruptedException {
TaggedMetricRegistry registry = DefaultTaggedMetricRegistry.getDefault();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] expected = "test request body".getBytes(StandardCharsets.UTF_8);
Request request = Request.builder()
.body(new RequestBody() {
@Override
public void writeTo(OutputStream output) throws IOException {
output.write(expected);
}

@Override
public String contentType() {
return "text/plain";
}

@Override
public boolean repeatable() {
return true;
}

@Override
public OptionalLong contentLength() {
return OptionalLong.of(expected.length);
}

@Override
public void close() {}
})
.build();

EndpointChannel channel = RequestSizeMetricsChannel.create(
config(ClientConfiguration.builder()
.from(TestConfigurations.create("https://foo:10001"))
.taggedMetricRegistry(registry)
.build()),
r -> {
try {
RequestBody body = r.body().get();
body.writeTo(baos);
body.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
return Futures.immediateFuture(new TestResponse().code(200));
},
TestEndpoint.GET);
ListenableFuture<Response> response = channel.execute(request);

assertThat(response.get().code()).isEqualTo(200);
Snapshot snapshot = DialogueClientMetrics.of(registry)
.requestsSize()
.serviceName("service")
.endpoint("endpoint")
.build()
.getSnapshot();
assertThat(snapshot.size()).isEqualTo(1);
assertThat(snapshot.get99thPercentile()).isEqualTo(expected.length);
}

private ImmutableConfig config(ClientConfiguration rawConfig) {
return ImmutableConfig.builder()
.channelName("channelName")
.channelFactory(factory)
.rawConfig(rawConfig)
.build();
}
}

0 comments on commit 6dc9052

Please sign in to comment.