From 54f3288209aacc7b6baf1dd19250ae5e8686fb00 Mon Sep 17 00:00:00 2001 From: mendess Date: Tue, 8 Aug 2023 17:37:00 +0100 Subject: [PATCH] Improve storage access patterns in ReportsProcessed DO The format processed reports are now stored in is an array of bytes with the 16 byte wide ids contiguous in memory --- Cargo.lock | 83 +++++------ Cargo.toml | 3 +- daphne/src/messages/mod.rs | 1 + .../src/durable/reports_processed.rs | 130 +++++++++++------- 4 files changed, 122 insertions(+), 95 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e7bc298a2..b648470a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -323,9 +323,9 @@ dependencies = [ [[package]] name = "chrono-tz" -version = "0.6.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29c39203181991a7dd4343b8005bd804e7a9a37afb8ac070e43771e8c820bbde" +checksum = "f1369bc6b9e9a7dfdae2055f6ec151fe9c554a9d23d357c0237cee2e25eaabb7" dependencies = [ "chrono", "chrono-tz-build", @@ -334,9 +334,9 @@ dependencies = [ [[package]] name = "chrono-tz-build" -version = "0.0.3" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f509c3a87b33437b05e2458750a0700e5bdd6956176773e6c7d6dd15a283a0c" +checksum = "e2f5ebdc942f57ed96d560a6d1a459bae5851102a25d5bf89dc04ae453e31ecf" dependencies = [ "parse-zoneinfo", "phf", @@ -1336,9 +1336,9 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "js-sys" -version = "0.3.61" +version = "0.3.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "445dde2150c55e483f3d8416706b97ec8e8237c307e5b7b4b8dd15e6af2a0730" +checksum = "2f37a4a5928311ac501dee68b3c7613a1037d0edb30c8e5427bd832d55d1b790" dependencies = [ "wasm-bindgen", ] @@ -1680,27 +1680,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676" dependencies = [ "siphasher", - "uncased", ] [[package]] name = "pin-project" -version = "1.0.12" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.0.12" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.28", ] [[package]] @@ -2636,15 +2635,6 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" -[[package]] -name = "uncased" -version = "0.9.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09b01702b0fd0b3fadcf98e098780badda8742d4f4a7676615cad90e8ac73622" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-bidi" version = "0.3.13" @@ -2746,9 +2736,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.84" +version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b" +checksum = "5bba0e8cb82ba49ff4e229459ff22a191bbe9a1cb3a341610c9c33efc27ddf73" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -2756,24 +2746,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.84" +version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9" +checksum = "19b04bc93f9d6bdee709f6bd2118f57dd6679cf1176a1af464fca3ab0d66d8fb" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.28", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.34" +version = "0.4.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f219e0d211ba40266969f6dbdd90636da12f75bee4fc9d6c23d1260dadb51454" +checksum = "2d1985d03709c53167ce907ff394f5316aa22cb4e12761295c5dc57dacb6297e" dependencies = [ "cfg-if", "js-sys", @@ -2783,9 +2773,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.84" +version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5" +checksum = "14d6b024f1a526bb0234f52840389927257beb670610081360e5a03c5df9c258" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2793,28 +2783,28 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.84" +version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" +checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.28", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.84" +version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d" +checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93" [[package]] name = "wasm-streams" -version = "0.2.3" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" +checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" dependencies = [ "futures-util", "js-sys", @@ -2825,9 +2815,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.61" +version = "0.3.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33b99f4b23ba3eec1a53ac264e35a755f00e966e0065077d6027c0f575b0b97" +checksum = "3bdd9ef4e984da1187bf8110c5cf5b845fbc87a23602cdf912386a76fcd3a7c2" dependencies = [ "js-sys", "wasm-bindgen", @@ -3050,9 +3040,9 @@ dependencies = [ [[package]] name = "worker" -version = "0.0.17" +version = "0.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c64a08abf27a129f60be182b26635bbd281fe1c4e4d4b4375383cb1a4ef9e2c2" +checksum = "9cd7ad167392bdd707a963356f3478844019c74fc89f6af0dfc656914b30af24" dependencies = [ "async-trait", "chrono", @@ -3066,6 +3056,7 @@ dependencies = [ "serde", "serde-wasm-bindgen", "serde_json", + "tokio", "url", "wasm-bindgen", "wasm-bindgen-futures", @@ -3093,14 +3084,14 @@ dependencies = [ [[package]] name = "worker-macros" -version = "0.0.9" +version = "0.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e052efe546b1571f03abaafac252a8103a4a698c2bfa8d5c48ca634f1817323" +checksum = "306c6b6fc316ce129de9cc393dc614b244afb37d43d8ae7a4dccf45d6f8a5ff5" dependencies = [ "async-trait", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.28", "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-macro-support", @@ -3109,9 +3100,9 @@ dependencies = [ [[package]] name = "worker-sys" -version = "0.0.9" +version = "0.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ad510995e943256afb8b7524366014bd4a2f6a4ffef00b8121db97a8056b4c3" +checksum = "8f5db3bd0e45980dbcefe567c978b4930e4526e864cd9e70482252a60229ddd7" dependencies = [ "cfg-if", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index b0233bc9f..e878b208b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ # SPDX-License-Identifier: BSD-3-Clause [workspace] +resolver = "2" members = [ "daphne", @@ -38,4 +39,4 @@ thiserror = "1.0.44" tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread"] } tracing = "0.1.37" url = { version = "2.4.0", features = ["serde"] } -worker = "0.0.17" +worker = "0.0.18" diff --git a/daphne/src/messages/mod.rs b/daphne/src/messages/mod.rs index bff55117a..aefc77c7d 100644 --- a/daphne/src/messages/mod.rs +++ b/daphne/src/messages/mod.rs @@ -40,6 +40,7 @@ macro_rules! id_struct { ($sname:ident, $len:expr, $doc:expr) => { #[doc=$doc] #[derive(Clone, Debug, Default, Deserialize, Hash, PartialEq, Eq, Serialize)] + #[repr(transparent)] pub struct $sname(#[serde(with = "hex")] pub [u8; $len]); impl $sname { diff --git a/daphne_worker/src/durable/reports_processed.rs b/daphne_worker/src/durable/reports_processed.rs index f03cef2a6..b20b293b2 100644 --- a/daphne_worker/src/durable/reports_processed.rs +++ b/daphne_worker/src/durable/reports_processed.rs @@ -3,9 +3,7 @@ use crate::{ config::DaphneWorkerConfig, - durable::{ - create_span_from_request, state_get, state_set_if_not_exists, BINDING_DAP_REPORTS_PROCESSED, - }, + durable::{create_span_from_request, state_get, BINDING_DAP_REPORTS_PROCESSED}, initialize_tracing, int_err, }; use daphne::{ @@ -16,12 +14,12 @@ use daphne::{ }, DapError, VdafConfig, }; -use futures::future::try_join_all; -use prio::codec::{CodecError, ParameterizedDecode}; +use futures::{future::ready, StreamExt, TryStreamExt}; +use prio::codec::{CodecError, Decode, Encode, ParameterizedDecode}; use serde::{Deserialize, Serialize}; use std::{borrow::Cow, collections::HashSet, ops::ControlFlow, time::Duration}; use tracing::Instrument; -use worker::*; +use worker::{js_sys::Uint8Array, *}; use super::{req_parse, Alarmed, DapDurableObject, GarbageCollectable}; @@ -45,27 +43,76 @@ pub(crate) const DURABLE_REPORTS_PROCESSED_MARK_AGGREGATED: &str = /// where `` is the hex-encoded report ID. #[durable_object] pub struct ReportsProcessed { - #[allow(dead_code)] state: State, env: Env, config: DaphneWorkerConfig, touched: bool, alarmed: bool, + reports_processed: Option>, } +// This is the maximum size of a value in durable object storage. +const MAX_DURABLE_OBJECT_VALUE_SIZE: usize = 131_072; + impl ReportsProcessed { - /// Check if the report has been processed. If not, return None; otherwise, return the ID. - async fn to_checked(&self, report_id: ReportId) -> Result> { - let key = format!("processed/{}", report_id.to_hex()); - let processed: bool = state_set_if_not_exists(&self.state, &key, &true) - .await? - .unwrap_or(false); - if processed { - Ok(Some(report_id)) - } else { - Ok(None) + async fn load_processed_reports(&mut self) -> Result<&mut HashSet> { + let reps = &mut self.reports_processed; + match reps { + Some(p) => Ok(p), + None => { + let processed = futures::stream::iter(0..) + .then(|i| { + let state_ref = &self.state; + async move { + state_get::>(state_ref, &format!("processed_reports/{i}")) + .await + .transpose() + } + }) + .take_while(|reports| ready(reports.is_some())) + .map(|reports| reports.unwrap()) + .try_fold(HashSet::new(), |mut set, reports| async move { + reports + .chunks(std::mem::size_of::()) + .map(ReportId::get_decoded) + .try_for_each(|r| { + set.insert(r?); + Ok(()) + }) + .map_err(|e: CodecError| { + Error::RustError(format!("failed to deserialize report id: {e:?}")) + }) + .map(|_| set) + }) + .await?; + + Ok(reps.insert(processed)) + } } } + + async fn store_processed_reports(&self) -> Result<()> { + let Some(reports) = &self.reports_processed else { + return Ok(()) + }; + let mut encoded_reports = + Vec::with_capacity(reports.len() * std::mem::size_of::()); + for r in reports { + r.encode(&mut encoded_reports); + } + for (i, chunk) in encoded_reports + .chunks(MAX_DURABLE_OBJECT_VALUE_SIZE) + .enumerate() + { + let array = Uint8Array::new_with_length(chunk.len() as _); + array.copy_from(chunk); + self.state + .storage() + .put_raw(&format!("processed_reports/{i}"), array) + .await?; + } + Ok(()) + } } #[durable_object] @@ -80,6 +127,7 @@ impl DurableObject for ReportsProcessed { config, touched: false, alarmed: false, + reports_processed: None, } } @@ -124,33 +172,18 @@ impl ReportsProcessed { // Output: `ReportsProcessedResp` (DURABLE_REPORTS_PROCESSED_INITIALIZE, Method::Post) => { let reports_processed_request: ReportsProcessedReq = req_parse(&mut req).await?; - let result = try_join_all( - reports_processed_request - .consumed_reports - .iter() - .filter(|consumed_report| consumed_report.is_ready()) - .map(|consumed_report| async { - if let Some(replayed) = state_get::( - &self.state, - &format!("processed/{}", consumed_report.metadata().id.to_hex()), - ) - .await? - { - if replayed { - return Result::Ok(Some(consumed_report.metadata().id.clone())); - } - } - Ok(None) - }), - ) - .await?; - let replayed_reports = result.into_iter().flatten().collect::>(); + + let seen = self.load_processed_reports().await?; + + let is_replay = |report: &EarlyReportStateConsumed| { + !report.is_ready() || seen.contains(&report.metadata().id) + }; let initialized_reports = reports_processed_request .consumed_reports .into_iter() .map(|consumed_report| { - if replayed_reports.contains(&consumed_report.metadata().id) { + if is_replay(&consumed_report) { Ok(EarlyReportStateInitialized::Rejected { metadata: Cow::Owned(consumed_report.metadata().clone()), failure: TransitionFailure::ReportReplayed, @@ -185,15 +218,16 @@ impl ReportsProcessed { // Output: `Vec` (DURABLE_REPORTS_PROCESSED_MARK_AGGREGATED, Method::Post) => { let report_ids: Vec = req_parse(&mut req).await?; - let replayed_reports = try_join_all( - report_ids - .into_iter() - .map(|report_id| self.to_checked(report_id)), - ) - .await? - .into_iter() - .flatten() - .collect::>(); + let seen = self.load_processed_reports().await?; + + let replayed_reports = report_ids.into_iter().fold(vec![], |mut replayed, id| { + if let Some(replayed_id) = seen.replace(id) { + replayed.push(replayed_id) + } + replayed + }); + + self.store_processed_reports().await?; Response::from_json(&replayed_reports) }