From 20241b97df350a9eb0a6ce60c13dd8147978116e Mon Sep 17 00:00:00 2001 From: pmendes Date: Tue, 15 Oct 2024 16:30:27 +0100 Subject: [PATCH] Use AggregateStore::ReportCount to check if a batch exists instead of loading the entire aggregate share --- crates/daphne-server/src/roles/aggregator.rs | 37 +++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/crates/daphne-server/src/roles/aggregator.rs b/crates/daphne-server/src/roles/aggregator.rs index 1d4168c9..a66e33b5 100644 --- a/crates/daphne-server/src/roles/aggregator.rs +++ b/crates/daphne-server/src/roles/aggregator.rs @@ -19,7 +19,7 @@ use daphne::{ use daphne_service_utils::durable_requests::bindings::{ self, AggregateStoreMergeOptions, AggregateStoreMergeReq, AggregateStoreMergeResp, }; -use futures::{future::try_join_all, StreamExt, TryStreamExt}; +use futures::{future::try_join_all, StreamExt, TryFutureExt, TryStreamExt}; use mappable_rc::Marc; use rayon::prelude::{IntoParallelIterator, ParallelIterator}; @@ -316,15 +316,34 @@ impl DapAggregator for crate::App { futures::stream::iter(agg_span) .map(|bucket| async move { - Ok::( - !self - .durable() - .request(bindings::AggregateStore::Get, (version, task_id, &bucket)) + let durable = self.durable(); + let params = (version, task_id, &bucket); + + let get_report_count = || { + durable + .request(bindings::AggregateStore::ReportCount, params) + .send::() + }; + + // TODO: remove this after the worker has this feature deployed. + let backwards_compat_get_report_count = || { + durable + .request(bindings::AggregateStore::Get, params) .send::() - .await - .map_err(|e| fatal_error!(err = ?e, "failed to get an agg share"))? - .empty(), - ) + .map_ok(|r| r.report_count) + }; + + let count = get_report_count() + .or_else(|_| backwards_compat_get_report_count()) + .await + .map_err(|e| { + fatal_error!( + err = ?e, + params = ?params, + "failed fetching report count of an agg share" + ) + })?; + Ok(count > 0) }) .buffer_unordered(usize::MAX) .collect::>()