Skip to content

Commit

Permalink
Add method for getting just the report count of an aggregate share
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Oct 17, 2024
1 parent b9d31c1 commit bbc8d98
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ super::define_do_binding! {
Merge = "/internal/do/aggregate_store/merge",
MarkCollected = "/internal/do/aggregate_store/mark_collected",
CheckCollected = "/internal/do/aggregate_store/check_collected",
ReportCount = "/internal/do/aggregate_store/report_count"
}

fn name((version, task_id, bucket): (DapVersion, &'n TaskId, &'n DapBatchBucket)) -> ObjectIdFrom {
Expand Down
19 changes: 19 additions & 0 deletions crates/daphne-worker/src/durable/aggregate_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,16 @@ impl AggregateStore {
Ok(())
}

async fn agg_share_report_count(&self) -> Result<u64> {
if let Some(agg_share) = &self.agg_share {
return Ok(agg_share.report_count);
};
Ok(self
.get::<DapAggregateShareMetadata>(METADATA_KEY)
.await?
.map_or(0, |m| m.report_count))
}

async fn get_agg_share(&mut self) -> Result<&mut DapAggregateShare> {
let keys = Self::agg_share_shard_keys();
let agg_share = if let Some(agg_share) = self.agg_share.take() {
Expand Down Expand Up @@ -477,6 +487,15 @@ impl GcDurableObject for AggregateStore {
Response::from_json(&agg_share)
}

// Get the number of reports in this aggregate share.
//
// Idempotent
// Output: `u64`
Some(bindings::AggregateStore::ReportCount) => {
let count = self.agg_share_report_count().await?;
Response::from_json(&count)
}

// Mark this bucket as collected.
//
// Non-idempotent (do not retry)
Expand Down

0 comments on commit bbc8d98

Please sign in to comment.