Skip to content

Commit

Permalink
Use bincode for DO request serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Aug 7, 2023
1 parent 891a30c commit 8017041
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 34 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions daphne_worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tracing-core = "0.1.31"
tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json"]}
tracing.workspace = true
worker.workspace = true
bincode = "1.3.3"

[dev-dependencies]
paste.workspace = true
4 changes: 2 additions & 2 deletions daphne_worker/src/durable/aggregate_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use daphne::DapAggregateShare;
use tracing::Instrument;
use worker::*;

use super::{DapDurableObject, GarbageCollectable};
use super::{req_parse, DapDurableObject, GarbageCollectable};

pub(crate) const DURABLE_AGGREGATE_STORE_GET: &str = "/internal/do/aggregate_store/get";
pub(crate) const DURABLE_AGGREGATE_STORE_MERGE: &str = "/internal/do/aggregate_store/merge";
Expand Down Expand Up @@ -86,7 +86,7 @@ impl AggregateStore {
// Input: `agg_share_dellta: DapAggregateShare`
// Output: `()`
(DURABLE_AGGREGATE_STORE_MERGE, Method::Post) => {
let agg_share_delta = req.json().await?;
let agg_share_delta = req_parse(&mut req).await?;

// To keep this pair of get and put operations atomic, there should be no await
// points between them. See the note below `transaction()` on
Expand Down
6 changes: 4 additions & 2 deletions daphne_worker/src/durable/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

use crate::{
durable,
durable::{create_span_from_request, DurableConnector, DurableOrdered, DurableReference},
durable::{
create_span_from_request, req_parse, DurableConnector, DurableOrdered, DurableReference,
},
initialize_tracing, int_err,
};
use tracing::{error, trace, Instrument};
Expand Down Expand Up @@ -38,7 +40,7 @@ impl GarbageCollector {
match (req.path().as_ref(), req.method()) {
// Schedule a durable object (DO) instance for deletion.
(DURABLE_GARBAGE_COLLECTOR_PUT, Method::Post) => {
let durable_ref: DurableReference = req.json().await?;
let durable_ref: DurableReference = req_parse(&mut req).await?;
match durable_ref.binding.as_ref() {
durable::BINDING_DAP_REPORTS_PENDING
| durable::BINDING_DAP_REPORTS_PROCESSED
Expand Down
4 changes: 2 additions & 2 deletions daphne_worker/src/durable/helper_state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use daphne::{messages::TaskId, DapVersion, MetaAggregationJobId};
use tracing::{trace, Instrument};
use worker::*;

use super::{Alarmed, DapDurableObject};
use super::{req_parse, Alarmed, DapDurableObject};

pub(crate) fn durable_helper_state_name(
version: &DapVersion,
Expand Down Expand Up @@ -91,7 +91,7 @@ impl HelperStateStore {
// Input: `helper_state_hex: String` (hex-encoded state)
// Output: `bool`
(DURABLE_HELPER_STATE_PUT_IF_NOT_EXISTS, Method::Post) => {
let helper_state_hex: String = req.json().await?;
let helper_state_hex: String = req_parse(&mut req).await?;
let success =
state_set_if_not_exists(&self.state, "helper_state", &helper_state_hex)
.await?
Expand Down
10 changes: 6 additions & 4 deletions daphne_worker/src/durable/leader_agg_job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::ops::ControlFlow;

use crate::{
config::DaphneWorkerConfig,
durable::{create_span_from_request, DurableOrdered, BINDING_DAP_LEADER_AGG_JOB_QUEUE},
durable::{
create_span_from_request, req_parse, DurableOrdered, BINDING_DAP_LEADER_AGG_JOB_QUEUE,
},
initialize_tracing, int_err,
};
use tracing::{debug, Instrument};
Expand Down Expand Up @@ -81,7 +83,7 @@ impl LeaderAggregationJobQueue {
// Input: `agg_job: DurableOrdered<String>` (the `String` is the name of the
// `ReportsPending` instance)
(DURABLE_LEADER_AGG_JOB_QUEUE_PUT, Method::Post) => {
let agg_job: DurableOrdered<String> = req.json().await?;
let agg_job: DurableOrdered<String> = req_parse(&mut req).await?;
agg_job.put(&self.state).await?;
debug!(
"LeaderAggregationJobQueue: {} has been scheduled",
Expand All @@ -96,7 +98,7 @@ impl LeaderAggregationJobQueue {
// Output: `Vec<String>` (the names of the `ReportsPending` instances from which to
// drain reports)
(DURABLE_LEADER_AGG_JOB_QUEUE_GET, Method::Post) => {
let max_agg_jobs: usize = req.json().await?;
let max_agg_jobs: usize = req_parse(&mut req).await?;
let res: Vec<String> =
DurableOrdered::get_front(&self.state, "agg_job", max_agg_jobs)
.await?
Expand All @@ -113,7 +115,7 @@ impl LeaderAggregationJobQueue {
// Input: `agg_job: DurableOrdered<String>` (the `String` is the name of the
// `ReportsPending` instance that has become empty)
(DURABLE_LEADER_AGG_JOB_QUEUE_FINISH, Method::Post) => {
let agg_job: DurableOrdered<String> = req.json().await?;
let agg_job: DurableOrdered<String> = req_parse(&mut req).await?;
agg_job.delete(&self.state).await?;
Response::from_json(&())
}
Expand Down
6 changes: 3 additions & 3 deletions daphne_worker/src/durable/leader_batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize};
use tracing::{debug, Instrument};
use worker::*;

use super::{DapDurableObject, GarbageCollectable};
use super::{req_parse, DapDurableObject, GarbageCollectable};

pub(crate) const DURABLE_LEADER_BATCH_QUEUE_ASSIGN: &str = "/internal/do/leader_batch_queue/assign";
pub(crate) const DURABLE_LEADER_BATCH_QUEUE_CURRENT: &str =
Expand Down Expand Up @@ -147,7 +147,7 @@ impl LeaderBatchQueue {
// Input: `(batch_size, num_unassigned): (usize, usize)`
// Output: `Vec<BatchCount>`
(DURABLE_LEADER_BATCH_QUEUE_ASSIGN, Method::Post) => {
let (batch_size, mut num_unassigned): (usize, usize) = req.json().await?;
let (batch_size, mut num_unassigned): (usize, usize) = req_parse(&mut req).await?;
if batch_size == 0 {
return Err(int_err("LeaderBatchQueue: called with batch_size is 0"));
}
Expand Down Expand Up @@ -190,7 +190,7 @@ impl LeaderBatchQueue {
//
// Input: `batch_id_hex: String`
(DURABLE_LEADER_BATCH_QUEUE_REMOVE, Method::Post) => {
let batch_id_hex: String = req.json().await?;
let batch_id_hex: String = req_parse(&mut req).await?;
let lookup_key = lookup_key(&batch_id_hex);
if let Some(lookup_val) = state_get::<String>(&self.state, &lookup_key).await? {
self.state.storage().delete(&lookup_val).await?;
Expand Down
9 changes: 5 additions & 4 deletions daphne_worker/src/durable/leader_col_job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize};
use tracing::Instrument;
use worker::*;

use super::{DapDurableObject, GarbageCollectable};
use super::{req_parse, DapDurableObject, GarbageCollectable};

const PENDING_PREFIX: &str = "pending";
const PROCESSED_PREFIX: &str = "processed";
Expand Down Expand Up @@ -110,7 +110,7 @@ impl LeaderCollectionJobQueue {
// Input: `collect_req: CollectReq`
// Output: `Id` (collect job ID)
(DURABLE_LEADER_COL_JOB_QUEUE_PUT, Method::Post) => {
let collect_queue_req: CollectQueueRequest = req.json().await?;
let collect_queue_req: CollectQueueRequest = req_parse(&mut req).await?;
let collection_job_id: CollectionJobId =
if let Some(cid) = &collect_queue_req.collect_job_id {
cid.clone()
Expand Down Expand Up @@ -181,7 +181,7 @@ impl LeaderCollectionJobQueue {
TaskId,
CollectionJobId,
Collection,
) = req.json().await?;
) = req_parse(&mut req).await?;
let processed_key = processed_key(&task_id, &collection_job_id);
let processed: Option<Collection> = state_get(&self.state, &processed_key).await?;
if processed.is_some() {
Expand Down Expand Up @@ -215,7 +215,8 @@ impl LeaderCollectionJobQueue {
// Input: `collection_job_id: Id`
// Output: `DapCollectionJob`
(DURABLE_LEADER_COL_JOB_QUEUE_GET_RESULT, Method::Post) => {
let (task_id, collection_job_id): (TaskId, CollectionJobId) = req.json().await?;
let (task_id, collection_job_id): (TaskId, CollectionJobId) =
req_parse(&mut req).await?;
let pending_key = pending_key(&task_id, &collection_job_id);
let pending = state_get::<String>(&self.state, &pending_key)
.await?
Expand Down
33 changes: 22 additions & 11 deletions daphne_worker/src/durable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use crate::{
};
use daphne::{messages::TaskId, DapBatchBucket, DapVersion};
use rand::prelude::*;
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{cmp::min, time::Duration};
use tracing::{info_span, warn};
use worker::*;
use worker::{js_sys::Uint8Array, *};

pub(crate) const DURABLE_DELETE_ALL: &str = "/internal/do/delete_all";

Expand Down Expand Up @@ -204,15 +204,20 @@ impl<'srv> DurableConnector<'srv> {
let mut attempt = 1;
loop {
let req = match (&method, &data) {
(Method::Post, Some(data)) => Request::new_with_init(
&format!("https://fake-host{durable_path}"),
RequestInit::new()
.with_method(Method::Post)
.with_body(Some(wasm_bindgen::JsValue::from_str(
&serde_json::to_string(&data)?,
)))
.with_headers(tracing_headers.clone()),
)?,
(Method::Post, Some(data)) => {
let data = bincode::serialize(&data).map_err(|e| {
Error::RustError(format!("failed to serialize data: {e:?}"))
})?;
let buffer = Uint8Array::new_with_length(data.len() as _);
buffer.copy_from(&data);
Request::new_with_init(
&format!("https://fake-host{durable_path}"),
RequestInit::new()
.with_method(Method::Post)
.with_body(Some(buffer.into()))
.with_headers(tracing_headers.clone()),
)?
}
(Method::Get, None) => Request::new_with_init(
&format!("https://fake-host{durable_path}"),
RequestInit::new()
Expand Down Expand Up @@ -652,6 +657,12 @@ fn span_to_headers() -> Headers {
})
}

async fn req_parse<T: DeserializeOwned>(req: &mut Request) -> Result<T> {
let bytes = req.bytes().await?;
bincode::deserialize(&bytes)
.map_err(|e| Error::RustError(format!("failed to deserialize bincode {e:?}")))
}

fn create_span_from_request(req: &Request) -> tracing::Span {
let path = req.path();
let span = info_span!("DO span", p = %shorten_paths(path.split('/')).display());
Expand Down
6 changes: 3 additions & 3 deletions daphne_worker/src/durable/reports_pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
leader_agg_job_queue::{
DURABLE_LEADER_AGG_JOB_QUEUE_FINISH, DURABLE_LEADER_AGG_JOB_QUEUE_PUT,
},
state_get, state_set_if_not_exists, DurableConnector, DurableOrdered,
req_parse, state_get, state_set_if_not_exists, DurableConnector, DurableOrdered,
BINDING_DAP_LEADER_AGG_JOB_QUEUE, BINDING_DAP_REPORTS_PENDING, MAX_KEYS,
},
initialize_tracing, int_err,
Expand Down Expand Up @@ -128,7 +128,7 @@ impl ReportsPending {
// Input: `reports_requested: usize`
// Output: `Vec<PendingReport>`
(DURABLE_REPORTS_PENDING_GET, Method::Post) => {
let reports_requested: usize = req.json().await?;
let reports_requested: usize = req_parse(&mut req).await?;
// Note we impose an upper limit on the user's specified limit.
let opt = ListOptions::new()
.prefix("pending/")
Expand Down Expand Up @@ -197,7 +197,7 @@ impl ReportsPending {
// Input: `pending_report: PendingReport`
// Output: `ReportsPendingResult`
(DURABLE_REPORTS_PENDING_PUT, Method::Post) => {
let pending_report: PendingReport = req.json().await?;
let pending_report: PendingReport = req_parse(&mut req).await?;
let report_id_hex = pending_report
.report_id_hex()
.ok_or_else(|| int_err("failed to parse report ID from report"))?;
Expand Down
6 changes: 3 additions & 3 deletions daphne_worker/src/durable/reports_processed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{borrow::Cow, collections::HashSet, ops::ControlFlow, time::Duration};
use tracing::Instrument;
use worker::*;

use super::{Alarmed, DapDurableObject, GarbageCollectable};
use super::{req_parse, Alarmed, DapDurableObject, GarbageCollectable};

pub(crate) const DURABLE_REPORTS_PROCESSED_INITIALIZE: &str =
"/internal/do/reports_processed/initialize";
Expand Down Expand Up @@ -123,7 +123,7 @@ impl ReportsProcessed {
// Input: `ReportsProcessedReq`
// Output: `ReportsProcessedResp`
(DURABLE_REPORTS_PROCESSED_INITIALIZE, Method::Post) => {
let reports_processed_request: ReportsProcessedReq = req.json().await?;
let reports_processed_request: ReportsProcessedReq = req_parse(&mut req).await?;
let result = try_join_all(
reports_processed_request
.consumed_reports
Expand Down Expand Up @@ -184,7 +184,7 @@ impl ReportsProcessed {
// Input: `Vec<ReportId>`
// Output: `Vec<ReportId>`
(DURABLE_REPORTS_PROCESSED_MARK_AGGREGATED, Method::Post) => {
let report_ids: Vec<ReportId> = req.json().await?;
let report_ids: Vec<ReportId> = req_parse(&mut req).await?;
let replayed_reports = try_join_all(
report_ids
.into_iter()
Expand Down

0 comments on commit 8017041

Please sign in to comment.