From bbc8d9820c864061639f9ee50d50dfbe715ed1f2 Mon Sep 17 00:00:00 2001 From: pmendes Date: Tue, 15 Oct 2024 16:29:58 +0100 Subject: [PATCH] Add method for getting just the report count of an aggregate share --- .../bindings/aggregate_store.rs | 1 + .../src/durable/aggregate_store.rs | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/crates/daphne-service-utils/src/durable_requests/bindings/aggregate_store.rs b/crates/daphne-service-utils/src/durable_requests/bindings/aggregate_store.rs index 2a27b97a..4cfbdb04 100644 --- a/crates/daphne-service-utils/src/durable_requests/bindings/aggregate_store.rs +++ b/crates/daphne-service-utils/src/durable_requests/bindings/aggregate_store.rs @@ -25,6 +25,7 @@ super::define_do_binding! { Merge = "/internal/do/aggregate_store/merge", MarkCollected = "/internal/do/aggregate_store/mark_collected", CheckCollected = "/internal/do/aggregate_store/check_collected", + ReportCount = "/internal/do/aggregate_store/report_count" } fn name((version, task_id, bucket): (DapVersion, &'n TaskId, &'n DapBatchBucket)) -> ObjectIdFrom { diff --git a/crates/daphne-worker/src/durable/aggregate_store.rs b/crates/daphne-worker/src/durable/aggregate_store.rs index ff71f5b1..b6f3fbfe 100644 --- a/crates/daphne-worker/src/durable/aggregate_store.rs +++ b/crates/daphne-worker/src/durable/aggregate_store.rs @@ -234,6 +234,16 @@ impl AggregateStore { Ok(()) } + async fn agg_share_report_count(&self) -> Result { + if let Some(agg_share) = &self.agg_share { + return Ok(agg_share.report_count); + }; + Ok(self + .get::(METADATA_KEY) + .await? + .map_or(0, |m| m.report_count)) + } + async fn get_agg_share(&mut self) -> Result<&mut DapAggregateShare> { let keys = Self::agg_share_shard_keys(); let agg_share = if let Some(agg_share) = self.agg_share.take() { @@ -477,6 +487,15 @@ impl GcDurableObject for AggregateStore { Response::from_json(&agg_share) } + // Get the number of reports in this aggregate share. + // + // Idempotent + // Output: `u64` + Some(bindings::AggregateStore::ReportCount) => { + let count = self.agg_share_report_count().await?; + Response::from_json(&count) + } + // Mark this bucket as collected. // // Non-idempotent (do not retry)