diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index 13c87bc1130ce..97685b552fa99 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -187,6 +187,17 @@ message MonitoringInfoSpecs { }] }]; + // Represents a set of strings seen across bundles. + USER_STRING_SET = 21 [(monitoring_info_spec) = { + urn: "beam:metric:user:string_set:v1", + type: "beam:metrics:string_set:v1", + required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"], + annotations: [{ + key: "description", + value: "URN utilized to report user metric." + }] + }]; + // General monitored state information which contains structured information // which does not fit into a typical metric format. See MonitoringTableData // for more details. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index a2f6511d5129a..2c8cd46126960 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -19,13 +19,16 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE; +import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.STRING_SET_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import java.io.Serializable; @@ -331,6 +334,28 @@ public MetricUpdates getUpdates() { return builder.build(); } + /** @return The MonitoringInfo metadata from the string set metric. */ + private @Nullable SimpleMonitoringInfoBuilder stringSetToMonitoringMetadata(MetricKey metricKey) { + return metricToMonitoringMetadata( + metricKey, + MonitoringInfoConstants.TypeUrns.STRING_SET_TYPE, + MonitoringInfoConstants.Urns.USER_STRING_SET); + } + + /** + * @param metricUpdate + * @return The MonitoringInfo generated from the string set metricUpdate. + */ + private @Nullable MonitoringInfo stringSetUpdateToMonitoringInfo( + MetricUpdate metricUpdate) { + SimpleMonitoringInfoBuilder builder = stringSetToMonitoringMetadata(metricUpdate.getKey()); + if (builder == null) { + return null; + } + builder.setStringSetValue(metricUpdate.getUpdate()); + return builder.build(); + } + /** Return the cumulative values for any metrics in this container as MonitoringInfos. */ @Override public Iterable getMonitoringInfos() { @@ -358,6 +383,13 @@ public Iterable getMonitoringInfos() { monitoringInfos.add(mi); } } + + for (MetricUpdate metricUpdate : metricUpdates.stringSetUpdates()) { + MonitoringInfo mi = stringSetUpdateToMonitoringInfo(metricUpdate); + if (mi != null) { + monitoringInfos.add(mi); + } + } return monitoringInfos; } @@ -391,6 +423,15 @@ public Map getMonitoringData(ShortIdMap shortIds) { } } }); + stringSets.forEach( + (metricName, gaugeCell) -> { + if (gaugeCell.getDirty().beforeCommit()) { + String shortId = getShortId(metricName, this::stringSetToMonitoringMetadata, shortIds); + if (shortId != null) { + builder.put(shortId, encodeStringSet(gaugeCell.getCumulative())); + } + } + }); return builder.build(); } @@ -418,7 +459,7 @@ private String getShortId( } /** - * Mark all the updates that were retrieved with the latest call to {@link #getUpdates()} as + * Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as * committed. */ public void commitUpdates() { @@ -480,6 +521,12 @@ private void updateForLatestInt64Type(MonitoringInfo monitoringInfo) { gauge.update(decodeInt64Gauge(monitoringInfo.getPayload())); } + private void updateForStringSetType(MonitoringInfo monitoringInfo) { + MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo); + StringSetCell stringSet = getStringSet(metricName); + stringSet.update(decodeStringSet(monitoringInfo.getPayload())); + } + /** Update values of this {@link MetricsContainerImpl} by reading from {@code monitoringInfos}. */ public void update(Iterable monitoringInfos) { for (MonitoringInfo monitoringInfo : monitoringInfos) { @@ -500,6 +547,10 @@ public void update(Iterable monitoringInfos) { updateForLatestInt64Type(monitoringInfo); break; + case STRING_SET_TYPE: + updateForStringSetType(monitoringInfo); + break; + default: LOG.warn("Unsupported metric type {}", monitoringInfo.getType()); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java index 697fc8487c6a5..bb512b54d82c3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java @@ -52,6 +52,8 @@ public static final class Urns { extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_INT64); public static final String USER_DISTRIBUTION_DOUBLE = extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_DOUBLE); + public static final String USER_STRING_SET = + extractUrn(MonitoringInfoSpecs.Enum.USER_STRING_SET); public static final String SAMPLED_BYTE_SIZE = extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE); public static final String WORK_COMPLETED = extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED); @@ -162,7 +164,7 @@ public static final class TypeUrns { public static final String BOTTOM_N_INT64_TYPE = "beam:metrics:bottom_n_int64:v1"; public static final String BOTTOM_N_DOUBLE_TYPE = "beam:metrics:bottom_n_double:v1"; public static final String PROGRESS_TYPE = "beam:metrics:progress:v1"; - public static final String SET_STRING_TYPE = "beam:metrics:set_string:v1"; + public static final String STRING_SET_TYPE = "beam:metrics:string_set:v1"; static { // Validate that compile time constants match the values stored in the protos. @@ -188,7 +190,7 @@ public static final class TypeUrns { checkArgument( BOTTOM_N_DOUBLE_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.BOTTOM_N_DOUBLE_TYPE))); checkArgument(PROGRESS_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.PROGRESS_TYPE))); - checkArgument(SET_STRING_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.SET_STRING_TYPE))); + checkArgument(STRING_SET_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.SET_STRING_TYPE))); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java index c44a2621ee6c7..b87df0237f8dd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java @@ -23,6 +23,7 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import java.util.HashMap; @@ -148,6 +149,16 @@ public SimpleMonitoringInfoBuilder setDoubleDistributionValue( return this; } + /** + * Encodes the value and sets the type to {@link + * MonitoringInfoConstants.TypeUrns#STRING_SET_TYPE}. + */ + public SimpleMonitoringInfoBuilder setStringSetValue(StringSetData value) { + this.builder.setPayload(encodeStringSet(value)); + this.builder.setType(MonitoringInfoConstants.TypeUrns.STRING_SET_TYPE); + return this; + } + /** Sets the MonitoringInfo label to the given name and value. */ public SimpleMonitoringInfoBuilder setLabel(String labelName, String labelValue) { this.builder.putLabels(labelName, labelValue); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java index 809919f611b45..4a1c3d5e9117f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java @@ -270,6 +270,38 @@ public void testMonitoringInfosArePopulatedForUserGauges() { assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build())); } + @Test + public void testMonitoringInfosArePopulatedForUserStringSets() { + MetricsContainerImpl testObject = new MetricsContainerImpl("step1"); + StringSetCell stringSetCellA = testObject.getStringSet(MetricName.named("ns", "nameA")); + StringSetCell stringSetCellB = testObject.getStringSet(MetricName.named("ns", "nameB")); + stringSetCellA.add("A"); + stringSetCellB.add("BBB"); + + SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder(); + builder1 + .setUrn(MonitoringInfoConstants.Urns.USER_STRING_SET) + .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns") + .setLabel(MonitoringInfoConstants.Labels.NAME, "nameA") + .setStringSetValue(stringSetCellA.getCumulative()) + .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1"); + + SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder(); + builder2 + .setUrn(MonitoringInfoConstants.Urns.USER_STRING_SET) + .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns") + .setLabel(MonitoringInfoConstants.Labels.NAME, "nameB") + .setStringSetValue(stringSetCellB.getCumulative()) + .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1"); + + List actualMonitoringInfos = new ArrayList<>(); + for (MonitoringInfo mi : testObject.getMonitoringInfos()) { + actualMonitoringInfos.add(mi); + } + + assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build())); + } + @Test public void testMonitoringInfosArePopulatedForSystemDistributions() { MetricsContainerImpl testObject = new MetricsContainerImpl("step1");