diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 4aa3226a4dc2b..f6345e7b9ec5b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -877,4 +877,12 @@ default boolean periodicRollover() { return false; } + /** + * Get the attributes associated with the cursor. + * + * @return the attributes associated with the cursor + */ + default ManagedCursorAttributes getManagedCursorAttributes() { + return new ManagedCursorAttributes(this); + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java new file mode 100644 index 0000000000000..6c06e68d75e24 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import io.opentelemetry.api.common.Attributes; +import lombok.Getter; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ManagedCursorOperationStatus; + +@Getter +public class ManagedCursorAttributes { + + private final Attributes attributes; + private final Attributes attributesOperationSucceed; + private final Attributes attributesOperationFailure; + + public ManagedCursorAttributes(ManagedCursor cursor) { + var mlName = cursor.getManagedLedger().getName(); + var topicName = TopicName.get(TopicName.fromPersistenceNamingEncoding(mlName)); + attributes = Attributes.of( + OpenTelemetryAttributes.ML_CURSOR_NAME, cursor.getName(), + OpenTelemetryAttributes.ML_LEDGER_NAME, mlName, + OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace() + ); + attributesOperationSucceed = Attributes.builder() + .putAll(attributes) + .putAll(ManagedCursorOperationStatus.SUCCESS.attributes) + .build(); + attributesOperationFailure = Attributes.builder() + .putAll(attributes) + .putAll(ManagedCursorOperationStatus.FAILURE.attributes) + .build(); + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 98ba722ba1c9b..4ef9678f3e180 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -77,6 +77,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedCursorAttributes; import org.apache.bookkeeper.mledger.ManagedCursorMXBean; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -286,6 +287,11 @@ public enum State { protected final ManagedCursorMXBean mbean; + private volatile ManagedCursorAttributes managedCursorAttributes; + private static final AtomicReferenceFieldUpdater ATTRIBUTES_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, ManagedCursorAttributes.class, + "managedCursorAttributes"); + @SuppressWarnings("checkstyle:javadoctype") public interface VoidCallback { void operationComplete(); @@ -3719,4 +3725,12 @@ public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) thro } return newNonDurableCursor; } + + @Override + public ManagedCursorAttributes getManagedCursorAttributes() { + if (managedCursorAttributes != null) { + return managedCursorAttributes; + } + return ATTRIBUTES_UPDATER.updateAndGet(this, old -> old != null ? old : new ManagedCursorAttributes(this)); + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index b1939f40e9358..00afb85a9d486 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -122,6 +122,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats; private final OpenTelemetryManagedLedgerStats openTelemetryManagedLedgerStats; + private final OpenTelemetryManagedCursorStats openTelemetryManagedCursorStats; //indicate whether shutdown() is called. private volatile boolean closed; @@ -231,6 +232,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this); openTelemetryManagedLedgerStats = new OpenTelemetryManagedLedgerStats(openTelemetry, this); + openTelemetryManagedCursorStats = new OpenTelemetryManagedCursorStats(openTelemetry, this); } static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy { @@ -622,6 +624,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { })); }).thenAcceptAsync(__ -> { //wait for tasks in scheduledExecutor executed. + openTelemetryManagedCursorStats.close(); openTelemetryManagedLedgerStats.close(); openTelemetryCacheStats.close(); scheduledExecutor.shutdownNow(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java new file mode 100644 index 0000000000000..93a749d4aef51 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import com.google.common.collect.Streams; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.BatchCallback; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.pulsar.opentelemetry.Constants; + +public class OpenTelemetryManagedCursorStats implements AutoCloseable { + + // Replaces ['pulsar_ml_cursor_persistLedgerSucceed', 'pulsar_ml_cursor_persistLedgerErrors'] + public static final String PERSIST_OPERATION_COUNTER = "pulsar.broker.managed_ledger.persist.operation.count"; + private final ObservableLongMeasurement persistOperationCounter; + + // Replaces ['pulsar_ml_cursor_persistZookeeperSucceed', 'pulsar_ml_cursor_persistZookeeperErrors'] + public static final String PERSIST_OPERATION_METADATA_STORE_COUNTER = + "pulsar.broker.managed_ledger.persist.mds.operation.count"; + private final ObservableLongMeasurement persistOperationMetadataStoreCounter; + + // Replaces pulsar_ml_cursor_nonContiguousDeletedMessagesRange + public static final String NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER = + "pulsar.broker.managed_ledger.message_range.count"; + private final ObservableLongMeasurement nonContiguousMessageRangeCounter; + + // Replaces pulsar_ml_cursor_writeLedgerSize + public static final String OUTGOING_BYTE_COUNTER = "pulsar.broker.managed_ledger.cursor.outgoing.size"; + private final ObservableLongMeasurement outgoingByteCounter; + + // Replaces pulsar_ml_cursor_writeLedgerLogicalSize + public static final String OUTGOING_BYTE_LOGICAL_COUNTER = + "pulsar.broker.managed_ledger.cursor.outgoing.logical.size"; + private final ObservableLongMeasurement outgoingByteLogicalCounter; + + // Replaces pulsar_ml_cursor_readLedgerSize + public static final String INCOMING_BYTE_COUNTER = "pulsar.broker.managed_ledger.cursor.incoming.size"; + private final ObservableLongMeasurement incomingByteCounter; + + private final BatchCallback batchCallback; + + public OpenTelemetryManagedCursorStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) { + var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME); + + persistOperationCounter = meter + .counterBuilder(PERSIST_OPERATION_COUNTER) + .setUnit("{operation}") + .setDescription("The number of acknowledgment operations on the ledger.") + .buildObserver(); + + persistOperationMetadataStoreCounter = meter + .counterBuilder(PERSIST_OPERATION_METADATA_STORE_COUNTER) + .setUnit("{operation}") + .setDescription("The number of acknowledgment operations in the metadata store.") + .buildObserver(); + + nonContiguousMessageRangeCounter = meter + .upDownCounterBuilder(NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER) + .setUnit("{range}") + .setDescription("The number of non-contiguous deleted messages ranges.") + .buildObserver(); + + outgoingByteCounter = meter + .counterBuilder(OUTGOING_BYTE_COUNTER) + .setUnit("{By}") + .setDescription("The total amount of data written to the ledger.") + .buildObserver(); + + outgoingByteLogicalCounter = meter + .counterBuilder(OUTGOING_BYTE_LOGICAL_COUNTER) + .setUnit("{By}") + .setDescription("The total amount of data written to the ledger, not including replicas.") + .buildObserver(); + + incomingByteCounter = meter + .counterBuilder(INCOMING_BYTE_COUNTER) + .setUnit("{By}") + .setDescription("The total amount of data read from the ledger.") + .buildObserver(); + + batchCallback = meter.batchCallback(() -> factory.getManagedLedgers() + .values() + .stream() + .map(ManagedLedgerImpl::getCursors) + .flatMap(Streams::stream) + .forEach(this::recordMetrics), + persistOperationCounter, + persistOperationMetadataStoreCounter, + nonContiguousMessageRangeCounter, + outgoingByteCounter, + outgoingByteLogicalCounter, + incomingByteCounter); + } + + @Override + public void close() { + batchCallback.close(); + } + + private void recordMetrics(ManagedCursor cursor) { + var stats = cursor.getStats(); + var cursorAttributesSet = cursor.getManagedCursorAttributes(); + var attributes = cursorAttributesSet.getAttributes(); + var attributesSucceed = cursorAttributesSet.getAttributesOperationSucceed(); + var attributesFailed = cursorAttributesSet.getAttributesOperationFailure(); + + persistOperationCounter.record(stats.getPersistLedgerSucceed(), attributesSucceed); + persistOperationCounter.record(stats.getPersistLedgerErrors(), attributesFailed); + + persistOperationMetadataStoreCounter.record(stats.getPersistZookeeperSucceed(), attributesSucceed); + persistOperationMetadataStoreCounter.record(stats.getPersistZookeeperErrors(), attributesFailed); + + nonContiguousMessageRangeCounter.record(cursor.getTotalNonContiguousDeletedMessagesRange(), attributes); + + outgoingByteCounter.record(stats.getWriteCursorLedgerSize(), attributes); + outgoingByteLogicalCounter.record(stats.getWriteCursorLedgerLogicalSize(), attributes); + incomingByteCounter.record(stats.getReadCursorLedgerSize(), attributes); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java index baa4bea570155..8ddb5320588da 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java @@ -18,20 +18,24 @@ */ package org.apache.pulsar.broker.stats; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.assertj.core.api.Assertions.assertThat; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursorMXBean; -import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.ManagedCursorAttributes; +import org.apache.bookkeeper.mledger.impl.OpenTelemetryManagedCursorStats; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; @@ -80,6 +84,12 @@ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws return PulsarTestClient.create(clientBuilder); } + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) { + super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder); + pulsarTestContextBuilder.enableOpenTelemetry(true); + } + /*** * This method has overridden these case: * brk_ml_cursor_persistLedgerSucceed @@ -115,10 +125,7 @@ public void testManagedCursorMetrics() throws Exception { .topic(topicName) .enableBatching(false) .create(); - final PersistentSubscription persistentSubscription = - (PersistentSubscription) pulsar.getBrokerService() - .getTopic(topicName, false).get().get().getSubscription(subName); - final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor(); + var managedCursor = getManagedCursor(topicName, subName); ManagedCursorMXBean managedCursorMXBean = managedCursor.getStats(); // Assert. metricsList = metrics.generate(); @@ -128,6 +135,19 @@ public void testManagedCursorMetrics() throws Exception { Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L); + // Validate OpenTelemetry metrics as well + var attributesSet = new ManagedCursorAttributes(managedCursor); + var otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationSucceed(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationSucceed(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER, + attributesSet.getAttributes(), 0); /** * 1. Send many messages, and only ack half. After the cursor data is written to BK, * verify "brk_ml_cursor_persistLedgerSucceed" and "brk_ml_cursor_nonContiguousDeletedMessagesRange". @@ -156,6 +176,17 @@ public void testManagedCursorMetrics() throws Exception { Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L); Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L); + otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationSucceed(), value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationSucceed(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER, + attributesSet.getAttributes(), value -> assertThat(value).isPositive()); // Ack another half. for (MessageId messageId : keepsMessageIdList){ consumer.acknowledge(messageId); @@ -171,6 +202,17 @@ public void testManagedCursorMetrics() throws Exception { Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L); + otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationSucceed(), value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationSucceed(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER, + attributesSet.getAttributes(), 0); /** * Make BK error, and send many message, then wait cursor persistent finish. * After the cursor data is written to ZK, verify "brk_ml_cursor_persistLedgerErrors" and @@ -196,6 +238,17 @@ public void testManagedCursorMetrics() throws Exception { Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L); + otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationSucceed(), value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER, + attributesSet.getAttributesOperationFailure(), value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationSucceed(), value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER, + attributesSet.getAttributesOperationFailure(), 0); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER, + attributesSet.getAttributes(), 0); /** * TODO verify "brk_ml_cursor_persistZookeeperErrors". * This is not easy to implement, we can use {@link #mockZooKeeper} to fail ZK, but we cannot identify whether @@ -210,13 +263,16 @@ public void testManagedCursorMetrics() throws Exception { admin.topics().delete(topicName, true); } - private ManagedCursorMXBean getManagedCursorMXBean(String topicName, String subscriptionName) - throws ExecutionException, InterruptedException { + private ManagedCursorMXBean getManagedCursorMXBean(String topicName, String subscriptionName) throws Exception { + var managedCursor = getManagedCursor(topicName, subscriptionName); + return managedCursor.getStats(); + } + + private ManagedCursor getManagedCursor(String topicName, String subscriptionName) throws Exception { final PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService() .getTopic(topicName, false).get().get().getSubscription(subscriptionName); - final ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor(); - return managedCursor.getStats(); + return persistentSubscription.getCursor(); } @Test @@ -265,9 +321,11 @@ public void testCursorReadWriteMetrics() throws Exception { } } + var managedCursor1 = getManagedCursor(topicName, subName1); + var cursorMXBean1 = managedCursor1.getStats(); + var managedCursor2 = getManagedCursor(topicName, subName2); + var cursorMXBean2 = managedCursor2.getStats(); // Wait for persistent cursor meta. - ManagedCursorMXBean cursorMXBean1 = getManagedCursorMXBean(topicName, subName1); - ManagedCursorMXBean cursorMXBean2 = getManagedCursorMXBean(topicName, subName2); Awaitility.await().until(() -> cursorMXBean1.getWriteCursorLedgerLogicalSize() > 0); Awaitility.await().until(() -> cursorMXBean2.getWriteCursorLedgerLogicalSize() > 0); @@ -281,6 +339,22 @@ public void testCursorReadWriteMetrics() throws Exception { Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L); Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L); + var otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + var attributes1 = new ManagedCursorAttributes(managedCursor1).getAttributes(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.OUTGOING_BYTE_COUNTER, + attributes1, value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.OUTGOING_BYTE_LOGICAL_COUNTER, + attributes1, value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.INCOMING_BYTE_COUNTER, + attributes1, 0); + + var attributes2 = new ManagedCursorAttributes(managedCursor2).getAttributes(); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.OUTGOING_BYTE_COUNTER, + attributes2, value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.OUTGOING_BYTE_LOGICAL_COUNTER, + attributes2, value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedCursorStats.INCOMING_BYTE_COUNTER, + attributes2, 0); // cleanup. consumer.close(); consumer2.close(); diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java index 24dd1be8509bf..41358a72c0d90 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java @@ -116,6 +116,7 @@ public interface OpenTelemetryAttributes { * The status of the Pulsar transaction. */ AttributeKey PULSAR_TRANSACTION_STATUS = AttributeKey.stringKey("pulsar.transaction.status"); + enum TransactionStatus { ABORTED, ACTIVE, @@ -174,6 +175,28 @@ enum BacklogQuotaType { public final Attributes attributes = Attributes.of(PULSAR_BACKLOG_QUOTA_TYPE, name().toLowerCase()); } + // Managed Ledger Attributes + /** + * The name of the managed ledger. + */ + AttributeKey ML_LEDGER_NAME = AttributeKey.stringKey("pulsar.managed_ledger.name"); + + /** + * The name of the managed cursor. + */ + AttributeKey ML_CURSOR_NAME = AttributeKey.stringKey("pulsar.managed_ledger.cursor.name"); + + /** + * The status of the managed cursor operation. + */ + AttributeKey ML_CURSOR_OPERATION_STATUS = + AttributeKey.stringKey("pulsar.managed_ledger.cursor.operation.status"); + enum ManagedCursorOperationStatus { + SUCCESS, + FAILURE; + public final Attributes attributes = Attributes.of(ML_CURSOR_OPERATION_STATUS, name().toLowerCase()); + } + /** * The name of the remote cluster for a Pulsar replicator. */