Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][misc] PIP-264: Copy OpenTelemetry resource attributes to Prometheus labels #23005

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
import io.opentelemetry.instrumentation.runtimemetrics.java17.RuntimeMetrics;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
Expand Down Expand Up @@ -97,6 +98,20 @@ public OpenTelemetryService(String clusterName,
return resource.merge(resourceBuilder.build());
});

sdkBuilder.addMetricReaderCustomizer((metricReader, configProperties) -> {
if (metricReader instanceof PrometheusHttpServer prometheusHttpServer) {
// At this point, the server is already started. We need to close it and create a new one with the
// correct resource attributes filter.
prometheusHttpServer.close();

// Allow all resource attributes to be exposed.
return prometheusHttpServer.toBuilder()
.setAllowedResourceAttributesFilter(s -> true)
.build();
}
return metricReader;
});

if (builderCustomizer != null) {
builderCustomizer.accept(sdkBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.tests.integration.metrics;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.awaitility.Awaitility.waitAtMost;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand All @@ -37,7 +39,6 @@
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.integration.topologies.PulsarTestBase;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

public class OpenTelemetrySanityTest {
Expand Down Expand Up @@ -71,17 +72,17 @@ public void testOpenTelemetryMetricsOtlpExport() throws Exception {
// TODO: Validate cluster name and service version are present once
// https://github.com/open-telemetry/opentelemetry-java/issues/6108 is solved.
var metricName = "queueSize_ratio"; // Sent automatically by the OpenTelemetry SDK.
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(
openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
return !metrics.findByNameAndLabels(metricName, "job", PulsarBrokerOpenTelemetry.SERVICE_NAME).isEmpty();
});
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(
openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
return !metrics.findByNameAndLabels(metricName, "job", PulsarProxyOpenTelemetry.SERVICE_NAME).isEmpty();
});
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(
openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
return !metrics.findByNameAndLabels(metricName, "job", PulsarWorkerOpenTelemetry.SERVICE_NAME).isEmpty();
Expand Down Expand Up @@ -120,30 +121,34 @@ public void testOpenTelemetryMetricsPrometheusExport() throws Exception {
pulsarCluster.start();
pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(), FunctionRuntimeType.PROCESS, 1);

var metricName = "target_info"; // Sent automatically by the OpenTelemetry SDK.
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(pulsarCluster.getBroker(0), prometheusExporterPort);
return !metrics.findByNameAndLabels(metricName,
var targetInfoMetricName = "target_info"; // Sent automatically by the OpenTelemetry SDK.
var cpuCountMetricName = "jvm_cpu_count"; // Configured by the OpenTelemetryService.
waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> {
var expectedMetrics = new String[] {targetInfoMetricName, cpuCountMetricName, "pulsar_broker_topic_producer_count"};
var actualMetrics = getMetricsFromPrometheus(pulsarCluster.getBroker(0), prometheusExporterPort);
assertThat(expectedMetrics).allMatch(expectedMetric -> !actualMetrics.findByNameAndLabels(expectedMetric,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name", PulsarBrokerOpenTelemetry.SERVICE_NAME),
Pair.of("service_version", PulsarVersion.getVersion()),
Pair.of("host_name", pulsarCluster.getBroker(0).getHostname())).isEmpty();
Pair.of("host_name", pulsarCluster.getBroker(0).getHostname())).isEmpty());
});
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(pulsarCluster.getProxy(), prometheusExporterPort);
return !metrics.findByNameAndLabels(metricName,
waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> {
var expectedMetrics = new String[] {targetInfoMetricName, cpuCountMetricName};
var actualMetrics = getMetricsFromPrometheus(pulsarCluster.getProxy(), prometheusExporterPort);
assertThat(expectedMetrics).allMatch(expectedMetric -> !actualMetrics.findByNameAndLabels(expectedMetric,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name", PulsarProxyOpenTelemetry.SERVICE_NAME),
Pair.of("service_version", PulsarVersion.getVersion()),
Pair.of("host_name", pulsarCluster.getProxy().getHostname())).isEmpty();
Pair.of("host_name", pulsarCluster.getProxy().getHostname())).isEmpty());
});
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort);
return !metrics.findByNameAndLabels(metricName,
waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> {
var expectedMetrics = new String[] {targetInfoMetricName, cpuCountMetricName};
var actualMetrics = getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort);
assertThat(expectedMetrics).allMatch(expectedMetric -> !actualMetrics.findByNameAndLabels(expectedMetric,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name", PulsarWorkerOpenTelemetry.SERVICE_NAME),
Pair.of("service_version", PulsarVersion.getVersion()),
Pair.of("host_name", pulsarCluster.getAnyWorker().getHostname())).isEmpty();
Pair.of("host_name", pulsarCluster.getAnyWorker().getHostname())).isEmpty());
});
}

Expand Down
Loading