Skip to content

Commit

Permalink
[apache#2117] feat(server): Introduce memory buffer related metrics(b…
Browse files Browse the repository at this point in the history
…lock/buffer/shuffle count) (apache#2118)

### What changes were proposed in this pull request?

Introduce block count in buffer pool metrics.

### Why are the changes needed?

Fix: apache#2117

### Does this PR introduce _any_ user-facing change?

Yes. `block_count_in_buffer_pool` metrics introduced.

- "block_count_in_buffer_pool"
- "buffer_count_in_buffer_pool"
- "shuffle_count_in_buffer_pool"

### How was this patch tested?

By dashboard server metrics popup page.
  • Loading branch information
maobaolong authored Sep 26, 2024
1 parent faad80d commit a364f2a
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public Gauge.Child addLabeledGauge(String name) {
}

public <T extends Number> void addLabeledGauge(String name, Supplier<T> supplier) {
addLabeledCacheGauge(name, supplier, 0);
}

public <T extends Number> void addLabeledCacheGauge(
String name, Supplier<T> supplier, long updateInterval) {
supplierGaugeMap.computeIfAbsent(
name,
metricName ->
Expand All @@ -93,7 +98,8 @@ public <T extends Number> void addLabeledGauge(String name, Supplier<T> supplier
"Gauge " + name,
supplier,
this.defaultLabelNames,
this.defaultLabelValues)
this.defaultLabelValues,
updateInterval)
.register(collectorRegistry));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,46 @@ class SupplierGauge<T extends Number> extends Collector implements Collector.Des
private Supplier<T> supplier;
private List<String> labelNames;
private List<String> labelValues;
private long updateInterval;
private long lastUpdateTime;
private T lastValue;

SupplierGauge(
String name, String help, Supplier<T> supplier, String[] labelNames, String[] labelValues) {
this(name, help, supplier, labelNames, labelValues, 0);
}

SupplierGauge(
String name,
String help,
Supplier<T> supplier,
String[] labelNames,
String[] labelValues,
long updateInterval) {
this.name = name;
this.help = help;
this.supplier = supplier;
this.labelNames = Arrays.asList(labelNames);
this.labelValues = Arrays.asList(labelValues);
this.updateInterval = updateInterval;
this.lastUpdateTime = 0;
}

@Override
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples.Sample> samples = new ArrayList<>();
T lastValue = supplier.get();
long time = System.currentTimeMillis();
if (time - lastUpdateTime > updateInterval) {
this.lastValue = this.supplier.get();
this.lastUpdateTime = time;
}
if (lastValue == null) {
LOG.warn("SupplierGauge {} returned null value.", this.name);
return Collections.emptyList();
}
samples.add(
new MetricFamilySamples.Sample(
this.name, this.labelNames, this.labelValues, lastValue.doubleValue()));
this.name, this.labelNames, this.labelValues, this.lastValue.doubleValue()));
MetricFamilySamples mfs = new MetricFamilySamples(this.name, Type.GAUGE, this.help, samples);
List<MetricFamilySamples> mfsList = new ArrayList<MetricFamilySamples>(1);
mfsList.add(mfs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ public class ShuffleServerMetrics {

public static final String REQUIRE_BUFFER_COUNT = "require_buffer_count";

public static final String BLOCK_COUNT_IN_BUFFER_POOL = "block_count_in_buffer_pool";
public static final String BUFFER_COUNT_IN_BUFFER_POOL = "buffer_count_in_buffer_pool";
public static final String SHUFFLE_COUNT_IN_BUFFER_POOL = "shuffle_count_in_buffer_pool";

public static Counter.Child counterTotalAppNum;
public static Counter.Child counterTotalAppWithHugePartitionNum;
public static Counter.Child counterTotalPartitionNum;
Expand Down Expand Up @@ -518,4 +522,12 @@ private static void setUpMetrics(ShuffleServerConf serverConf) {
public static <T extends Number> void addLabeledGauge(String name, Supplier<T> supplier) {
metricsManager.addLabeledGauge(name, supplier);
}

public static <T extends Number> void addLabeledCacheGauge(
String name, Supplier<T> supplier, long updateInterval) {
if (!isRegister) {
return;
}
metricsManager.addLabeledCacheGauge(name, supplier, updateInterval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskManager;

import static org.apache.uniffle.server.ShuffleServerMetrics.BLOCK_COUNT_IN_BUFFER_POOL;
import static org.apache.uniffle.server.ShuffleServerMetrics.BUFFER_COUNT_IN_BUFFER_POOL;
import static org.apache.uniffle.server.ShuffleServerMetrics.SHUFFLE_COUNT_IN_BUFFER_POOL;

public class ShuffleBufferManager {

private static final Logger LOG = LoggerFactory.getLogger(ShuffleBufferManager.class);
Expand Down Expand Up @@ -141,6 +145,27 @@ public ShuffleBufferManager(
appBlockSizeMetricEnabled =
conf.getBoolean(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED);
shuffleBufferType = conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE);

ShuffleServerMetrics.addLabeledCacheGauge(
BLOCK_COUNT_IN_BUFFER_POOL,
() ->
bufferPool.values().stream()
.flatMap(innerMap -> innerMap.values().stream())
.flatMap(rangeMap -> rangeMap.asMapOfRanges().values().stream())
.mapToInt(shuffleBuffer -> shuffleBuffer.getBlockCount())
.sum(),
2 * 60 * 1000L /* 2 minutes */);
ShuffleServerMetrics.addLabeledCacheGauge(
BUFFER_COUNT_IN_BUFFER_POOL,
() ->
bufferPool.values().stream()
.flatMap(innerMap -> innerMap.values().stream())
.mapToInt(rangeMap -> rangeMap.asMapOfRanges().size())
.sum(),
2 * 60 * 1000L /* 2 minutes */);
ShuffleServerMetrics.addLabeledGauge(
SHUFFLE_COUNT_IN_BUFFER_POOL,
() -> bufferPool.values().stream().mapToInt(innerMap -> innerMap.size()).sum());
}

public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
Expand Down

0 comments on commit a364f2a

Please sign in to comment.