From a364f2acad60a2807a3faef14d687ef5254e4166 Mon Sep 17 00:00:00 2001 From: maobaolong Date: Thu, 26 Sep 2024 14:47:45 +0800 Subject: [PATCH] [#2117] feat(server): Introduce memory buffer related metrics(block/buffer/shuffle count) (#2118) ### What changes were proposed in this pull request? Introduce block count in buffer pool metrics. ### Why are the changes needed? Fix: #2117 ### Does this PR introduce _any_ user-facing change? Yes. `block_count_in_buffer_pool` metrics introduced. - "block_count_in_buffer_pool" - "buffer_count_in_buffer_pool" - "shuffle_count_in_buffer_pool" ### How was this patch tested? By dashboard server metrics popup page. --- .../common/metrics/MetricsManager.java | 8 +++++- .../uniffle/common/metrics/SupplierGauge.java | 23 +++++++++++++++-- .../uniffle/server/ShuffleServerMetrics.java | 12 +++++++++ .../server/buffer/ShuffleBufferManager.java | 25 +++++++++++++++++++ 4 files changed, 65 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java index bf6a95bd7f..efa216e0ce 100644 --- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java +++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java @@ -85,6 +85,11 @@ public Gauge.Child addLabeledGauge(String name) { } public void addLabeledGauge(String name, Supplier supplier) { + addLabeledCacheGauge(name, supplier, 0); + } + + public void addLabeledCacheGauge( + String name, Supplier supplier, long updateInterval) { supplierGaugeMap.computeIfAbsent( name, metricName -> @@ -93,7 +98,8 @@ public void addLabeledGauge(String name, Supplier supplier "Gauge " + name, supplier, this.defaultLabelNames, - this.defaultLabelValues) + this.defaultLabelValues, + updateInterval) .register(collectorRegistry)); } diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java index eb7756ef62..6892a3afac 100644 --- a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java +++ b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java @@ -36,27 +36,46 @@ class SupplierGauge extends Collector implements Collector.Des private Supplier supplier; private List labelNames; private List labelValues; + private long updateInterval; + private long lastUpdateTime; + private T lastValue; SupplierGauge( String name, String help, Supplier supplier, String[] labelNames, String[] labelValues) { + this(name, help, supplier, labelNames, labelValues, 0); + } + + SupplierGauge( + String name, + String help, + Supplier supplier, + String[] labelNames, + String[] labelValues, + long updateInterval) { this.name = name; this.help = help; this.supplier = supplier; this.labelNames = Arrays.asList(labelNames); this.labelValues = Arrays.asList(labelValues); + this.updateInterval = updateInterval; + this.lastUpdateTime = 0; } @Override public List collect() { List samples = new ArrayList<>(); - T lastValue = supplier.get(); + long time = System.currentTimeMillis(); + if (time - lastUpdateTime > updateInterval) { + this.lastValue = this.supplier.get(); + this.lastUpdateTime = time; + } if (lastValue == null) { LOG.warn("SupplierGauge {} returned null value.", this.name); return Collections.emptyList(); } samples.add( new MetricFamilySamples.Sample( - this.name, this.labelNames, this.labelValues, lastValue.doubleValue())); + this.name, this.labelNames, this.labelValues, this.lastValue.doubleValue())); MetricFamilySamples mfs = new MetricFamilySamples(this.name, Type.GAUGE, this.help, samples); List mfsList = new ArrayList(1); mfsList.add(mfs); diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java index d22cec0b30..82f784d900 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java @@ -159,6 +159,10 @@ public class ShuffleServerMetrics { public static final String REQUIRE_BUFFER_COUNT = "require_buffer_count"; + public static final String BLOCK_COUNT_IN_BUFFER_POOL = "block_count_in_buffer_pool"; + public static final String BUFFER_COUNT_IN_BUFFER_POOL = "buffer_count_in_buffer_pool"; + public static final String SHUFFLE_COUNT_IN_BUFFER_POOL = "shuffle_count_in_buffer_pool"; + public static Counter.Child counterTotalAppNum; public static Counter.Child counterTotalAppWithHugePartitionNum; public static Counter.Child counterTotalPartitionNum; @@ -518,4 +522,12 @@ private static void setUpMetrics(ShuffleServerConf serverConf) { public static void addLabeledGauge(String name, Supplier supplier) { metricsManager.addLabeledGauge(name, supplier); } + + public static void addLabeledCacheGauge( + String name, Supplier supplier, long updateInterval) { + if (!isRegister) { + return; + } + metricsManager.addLabeledCacheGauge(name, supplier, updateInterval); + } } diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index 39002a0dcc..18dd94c198 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -54,6 +54,10 @@ import org.apache.uniffle.server.ShuffleServerMetrics; import org.apache.uniffle.server.ShuffleTaskManager; +import static org.apache.uniffle.server.ShuffleServerMetrics.BLOCK_COUNT_IN_BUFFER_POOL; +import static org.apache.uniffle.server.ShuffleServerMetrics.BUFFER_COUNT_IN_BUFFER_POOL; +import static org.apache.uniffle.server.ShuffleServerMetrics.SHUFFLE_COUNT_IN_BUFFER_POOL; + public class ShuffleBufferManager { private static final Logger LOG = LoggerFactory.getLogger(ShuffleBufferManager.class); @@ -141,6 +145,27 @@ public ShuffleBufferManager( appBlockSizeMetricEnabled = conf.getBoolean(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED); shuffleBufferType = conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE); + + ShuffleServerMetrics.addLabeledCacheGauge( + BLOCK_COUNT_IN_BUFFER_POOL, + () -> + bufferPool.values().stream() + .flatMap(innerMap -> innerMap.values().stream()) + .flatMap(rangeMap -> rangeMap.asMapOfRanges().values().stream()) + .mapToInt(shuffleBuffer -> shuffleBuffer.getBlockCount()) + .sum(), + 2 * 60 * 1000L /* 2 minutes */); + ShuffleServerMetrics.addLabeledCacheGauge( + BUFFER_COUNT_IN_BUFFER_POOL, + () -> + bufferPool.values().stream() + .flatMap(innerMap -> innerMap.values().stream()) + .mapToInt(rangeMap -> rangeMap.asMapOfRanges().size()) + .sum(), + 2 * 60 * 1000L /* 2 minutes */); + ShuffleServerMetrics.addLabeledGauge( + SHUFFLE_COUNT_IN_BUFFER_POOL, + () -> bufferPool.values().stream().mapToInt(innerMap -> innerMap.size()).sum()); } public void setShuffleTaskManager(ShuffleTaskManager taskManager) {