diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml index 7e2335822..976143c76 100644 --- a/.github/workflows/ci-build.yml +++ b/.github/workflows/ci-build.yml @@ -124,9 +124,7 @@ jobs: uses: dtolnay/rust-toolchain@master with: toolchain: ${{ matrix.rust-toolchain }} - components: clippy, rustfmt - - name: Format - run: cargo fmt --message-format human -- --check + components: clippy - name: Clippy run: cargo clippy --profile ci --workspace --all-targets - name: Clippy (all features) @@ -138,6 +136,18 @@ jobs: with: command: check bans licenses sources -A unmatched-organization + rustfmt: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Rust nightly toolchain + uses: dtolnay/rust-toolchain@master + with: + toolchain: nightly + components: rustfmt + - name: Format + run: cargo fmt --message-format human -- --check + janus_docker: runs-on: ubuntu-latest env: diff --git a/.rustfmt.toml b/.rustfmt.toml new file mode 100644 index 000000000..95a03dd6a --- /dev/null +++ b/.rustfmt.toml @@ -0,0 +1,5 @@ +format_strings = true +wrap_comments = true +comment_width = 100 +imports_granularity = "Crate" +group_imports = "One" diff --git a/.vscode/settings.json.example b/.vscode/settings.json.example new file mode 100644 index 000000000..ebde9f78f --- /dev/null +++ b/.vscode/settings.json.example @@ -0,0 +1,10 @@ +{ + "rust-analyzer.cargo.features": [ + "prometheus", + "otlp", + "testcontainer" + ], + "rust-analyzer.rustfmt.extraArgs": [ + "+nightly" + ] +} diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index fc972148d..92a8ebfeb 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -8,8 +8,10 @@ use crate::{ AggregationJobWriter, AggregationJobWriterMetrics, InitialWrite, ReportAggregationUpdate as _, WritableReportAggregation, }, - error::{handle_ping_pong_error, ReportRejection, ReportRejectionReason}, - error::{BatchMismatch, OptOutReason}, + error::{ + handle_ping_pong_error, BatchMismatch, OptOutReason, ReportRejection, + ReportRejectionReason, + }, query_type::{CollectableQueryType, UploadableQueryType}, report_writer::{ReportWriteBatcher, WritableReport}, }, @@ -161,8 +163,8 @@ struct AggregatorMetrics { upload_decode_failure_counter: Counter, /// Counter tracking the number of successfully-aggregated reports. report_aggregation_success_counter: Counter, - /// Counters tracking the number of failures to step client reports through the aggregation - /// process. + /// Counters tracking the number of failures to step client reports through + /// the aggregation process. aggregate_step_failure_counter: Counter, } @@ -182,8 +184,8 @@ pub struct Config { /// transaction. pub max_upload_batch_size: usize, - /// Defines the maximum delay before writing a batch of uploaded reports, even if it has not yet - /// reached `max_batch_upload_size`. This is the maximum delay added to the + /// Defines the maximum delay before writing a batch of uploaded reports, even if it has not + /// yet reached `max_batch_upload_size`. This is the maximum delay added to the /// `tasks/{task-id}/reports` endpoint due to write-batching. pub max_upload_batch_write_delay: StdDuration, @@ -197,16 +199,16 @@ pub struct Config { /// of getting task metrics. pub task_counter_shard_count: u64, - /// Defines how often to refresh the global HPKE configs cache. This affects how often an aggregator - /// becomes aware of key state changes. + /// Defines how often to refresh the global HPKE configs cache. This affects how often an + /// aggregator becomes aware of key state changes. pub global_hpke_configs_refresh_interval: StdDuration, - /// Defines how long tasks should be cached for. This affects how often an aggregator becomes aware - /// of task parameter changes. + /// Defines how long tasks should be cached for. This affects how often an aggregator becomes + /// aware of task parameter changes. pub task_cache_ttl: StdDuration, - /// Defines how many tasks can be cached at once. This affects how much memory the aggregator may - /// consume for caching tasks. + /// Defines how many tasks can be cached at once. This affects how much memory the aggregator + /// may consume for caching tasks. pub task_cache_capacity: u64, /// The key used to sign HPKE configurations. @@ -248,9 +250,9 @@ impl Aggregator { cfg.max_upload_batch_size, cfg.max_upload_batch_write_delay, ), - // If we're in taskprov mode, we can never cache None entries for tasks, since aggregators - // could insert tasks at any time and expect them to be available across all aggregator - // replicas. + // If we're in taskprov mode, we can never cache None entries for tasks, since + // aggregators could insert tasks at any time and expect them to be available across + // all aggregator replicas. !cfg.taskprov_config.enabled, cfg.task_cache_capacity, StdDuration::from_secs(1), @@ -339,10 +341,10 @@ impl Aggregator { match task_aggregator.handle_hpke_config() { Some(hpke_config_list) => hpke_config_list, // Assuming something hasn't gone horribly wrong with the database, this - // should only happen in the case where the system has been moved from taskprov - // mode to non-taskprov mode. Thus there's still taskprov tasks in the database. - // This isn't a supported use case, so the operator needs to delete these tasks - // or move the system back into taskprov mode. + // should only happen in the case where the system has been moved from + // taskprov mode to non-taskprov mode. Thus there's still taskprov tasks in + // the database. This isn't a supported use case, so the operator needs to + // delete these tasks or move the system back into taskprov mode. None => { return Err(Error::Internal("task has no HPKE configs".to_string())) } @@ -580,10 +582,10 @@ impl Aggregator { .await } - /// Handle a GET request for a collection job. `collection_job_id` is the unique identifier for the - /// collection job parsed out of the request URI. Returns an encoded [`Collection`] if the collect - /// job has been run to completion, `None` if the collection job has not yet run, or an error - /// otherwise. + /// Handle a GET request for a collection job. `collection_job_id` is the unique identifier for + /// the collection job parsed out of the request URI. Returns an encoded [`Collection`] if the + /// collect job has been run to completion, `None` if the collection job has not yet run, or an + /// error otherwise. async fn handle_get_collection_job( &self, task_id: &TaskId, @@ -657,8 +659,9 @@ impl Aggregator { return Err(Error::UnrecognizedTask(*task_id)); } - // Authorize the request and retrieve the collector's HPKE config. If this is a taskprov task, we - // have to use the peer aggregator's collector config rather than the main task. + // Authorize the request and retrieve the collector's HPKE config. If this is a taskprov + // task, we have to use the peer aggregator's collector config rather than the main + // task. let collector_hpke_config = if self.cfg.taskprov_config.enabled && taskprov_task_config.is_some() { let (peer_aggregator, _, _) = self @@ -768,9 +771,9 @@ impl Aggregator { .await .or_else(|error| -> Result<(), Error> { match error { - // If the task is already in the datastore, then some other request or aggregator - // replica beat us to inserting it. They _should_ have inserted all the same parameters - // as we would have, so we can proceed as normal. + // If the task is already in the datastore, then some other request or + // aggregator replica beat us to inserting it. They _should_ have inserted all + // the same parameters as we would have, so we can proceed as normal. DatastoreError::MutationTargetAlreadyExists => { warn!( ?task_id, @@ -972,7 +975,8 @@ impl TaskAggregator { fn handle_hpke_config(&self) -> Option { // TODO(#239): consider deciding a better way to determine "primary" (e.g. most-recent) HPKE // config/key -- right now it's the one with the maximal config ID, but that will run into - // trouble if we ever need to wrap-around, which we may since config IDs are effectively a u8. + // trouble if we ever need to wrap-around, which we may since config IDs are effectively a + // u8. Some(HpkeConfigList::new(Vec::from([self .task .hpke_keys() @@ -1119,10 +1123,9 @@ impl TaskAggregator { #[cfg(feature = "fpvec_bounded_l2")] mod vdaf_ops_strategies { - use std::sync::Arc; - use janus_core::vdaf::vdaf_dp_strategies; use prio::dp::distributions::ZCdpDiscreteGaussian; + use std::sync::Arc; #[derive(Debug)] pub enum Prio3FixedPointBoundedL2VecSum { @@ -1781,7 +1784,8 @@ impl VdafOps { // Decrypt shares & prepare initialization states. (§4.4.4.1) let mut report_share_data = Vec::new(); for (ord, prepare_init) in req.prepare_inits().iter().enumerate() { - // If decryption fails, then the aggregator MUST fail with error `hpke-decrypt-error`. (§4.4.2.2) + // If decryption fails, then the aggregator MUST fail with error + // `hpke-decrypt-error`. (§4.4.2.2) let global_hpke_keypair = global_hpke_keypairs.keypair( prepare_init .report_share() @@ -2003,9 +2007,10 @@ impl VdafOps { Ok(shares) }); - // Next, the aggregator runs the preparation-state initialization algorithm for the VDAF - // associated with the task and computes the first state transition. [...] If either - // step fails, then the aggregator MUST fail with error `vdaf-prep-error`. (§4.4.2.2) + // Next, the aggregator runs the preparation-state initialization algorithm for + // the VDAF associated with the task and computes the first state transition. + // [...] If either step fails, then the aggregator MUST fail with error + // `vdaf-prep-error`. (§4.4.2.2) let init_rslt = shares.and_then(|(public_share, input_share)| { trace_span!("VDAF preparation (helper initialization)").in_scope(|| { vdaf.helper_initialized( @@ -2033,8 +2038,8 @@ impl VdafOps { let (report_aggregation_state, prepare_step_result, output_share) = match init_rslt { Ok((PingPongState::Continued(prepare_state), outgoing_message)) => { - // Helper is not finished. Await the next message from the Leader to advance to - // the next step. + // Helper is not finished. Await the next message from the Leader to + // advance to the next step. ( ReportAggregationState::WaitingHelper { prepare_state }, PrepareStepResult::Continue { @@ -2199,7 +2204,8 @@ impl VdafOps { } // Write report shares, and ensure this isn't a repeated report aggregation. - // TODO(#225): on repeated aggregation, verify input share matches previously-received input share + // TODO(#225): on repeated aggregation, verify input share matches + // previously-received input share try_join_all(report_share_data.iter_mut().map(|rsd| { let task = Arc::clone(&task); let aggregation_job = Arc::clone(&aggregation_job); @@ -2366,8 +2372,8 @@ impl VdafOps { .collect(), )); } else if aggregation_job.step().increment() != req.step() { - // If this is not a replay, the leader should be advancing our state to the next - // step and no further. + // If this is not a replay, the leader should be advancing our state to the + // next step and no further. return Err(datastore::Error::User( Error::StepMismatch { task_id: *task.id(), @@ -2513,7 +2519,8 @@ impl VdafOps { Arc::clone(&aggregation_param), ); Box::pin(async move { - // Check if this collection job already exists, ensuring that all parameters match. + // Check if this collection job already exists, ensuring that all parameters + // match. if let Some(collection_job) = tx .get_collection_job::(&vdaf, task.id(), &collection_job_id) .await? @@ -2680,9 +2687,9 @@ impl VdafOps { } => { // §4.4.4.3: HPKE encrypt aggregate share to the collector. We store the leader // aggregate share *unencrypted* in the datastore so that we can encrypt cached - // results to the collector HPKE config valid when the current collection job request - // was made, and not whatever was valid at the time the aggregate share was first - // computed. + // results to the collector HPKE config valid when the current collection job + // request was made, and not whatever was valid at the time the + // aggregate share was first computed. // However we store the helper's *encrypted* share. // TODO(#240): consider fetching freshly encrypted helper aggregate share if it has @@ -2694,9 +2701,9 @@ impl VdafOps { "Serving cached collection job response" ); let encrypted_leader_aggregate_share = hpke::seal( - // Unwrap safety: collector_hpke_config is only None for taskprov tasks. Taskprov - // is not currently supported for Janus operating as the Leader, so this unwrap - // is not reachable. + // Unwrap safety: collector_hpke_config is only None for taskprov tasks. + // Taskprov is not currently supported for Janus operating + // as the Leader, so this unwrap is not reachable. task.collector_hpke_config().unwrap(), &HpkeApplicationInfo::new( &Label::AggregateShare, diff --git a/aggregator/src/aggregator/aggregation_job_continue.rs b/aggregator/src/aggregator/aggregation_job_continue.rs index 1f566ef8f..de66f4db8 100644 --- a/aggregator/src/aggregator/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/aggregation_job_continue.rs @@ -73,22 +73,23 @@ impl VdafOps { let mut report_aggregations_iter = report_aggregations.into_iter(); let mut report_aggregations_to_write = Vec::new(); for prep_step in req.prepare_steps() { - // Match preparation step received from leader to stored report aggregation, and extract - // the stored preparation step. + // Match preparation step received from leader to stored report aggregation, and + // extract the stored preparation step. let report_aggregation = loop { let report_agg = report_aggregations_iter.next().ok_or_else(|| { datastore::Error::User( Error::InvalidMessage( Some(task_id), "leader sent unexpected, duplicate, or out-of-order prepare \ - steps", + steps", ) .into(), ) })?; if report_agg.report_id() != prep_step.report_id() { - // This report was omitted by the leader because of a prior failure. Note that - // the report was dropped (if it's not already in an error state) and continue. + // This report was omitted by the leader because of a prior failure. + // Note that the report was dropped (if it's + // not already in an error state) and continue. if matches!( report_agg.state(), ReportAggregationState::WaitingHelper { .. } @@ -113,7 +114,7 @@ impl VdafOps { return Err(datastore::Error::User( Error::Internal( "helper encountered unexpected \ - ReportAggregationState::WaitingLeader" + ReportAggregationState::WaitingLeader" .to_string(), ) .into(), diff --git a/aggregator/src/aggregator/aggregation_job_creator.rs b/aggregator/src/aggregator/aggregation_job_creator.rs index 11138a45a..61c226031 100644 --- a/aggregator/src/aggregator/aggregation_job_creator.rs +++ b/aggregator/src/aggregator/aggregation_job_creator.rs @@ -118,7 +118,8 @@ impl AggregationJobCreator { } pub async fn run(self: Arc, stopper: Stopper) { - // TODO(#1393): add support for handling only a subset of tasks in a single job (i.e. sharding). + // TODO(#1393): add support for handling only a subset of tasks in a single job (i.e. + // sharding). // Create metric instruments. let task_update_time_histogram = self @@ -711,9 +712,9 @@ impl AggregationJobCreator { .await?) } - /// Look for combinations of client reports and collection job aggregation parameters that do not - /// yet have a report aggregation, and batch them into new aggregation jobs. This should only - /// be used with VDAFs that have non-unit type aggregation parameters. + /// Look for combinations of client reports and collection job aggregation parameters that do + /// not yet have a report aggregation, and batch them into new aggregation jobs. This should + /// only be used with VDAFs that have non-unit type aggregation parameters. // This is only used in tests thus far. #[cfg(feature = "test-util")] async fn create_aggregation_jobs_for_time_interval_task_with_param( @@ -910,9 +911,8 @@ impl AggregationJobCreator { #[cfg(test)] mod tests { - use crate::aggregator::test_util::BATCH_AGGREGATION_SHARD_COUNT; - use super::AggregationJobCreator; + use crate::aggregator::test_util::BATCH_AGGREGATION_SHARD_COUNT; use futures::future::try_join_all; use janus_aggregator_core::{ datastore::{ @@ -3189,7 +3189,8 @@ mod tests { } // Create more than MAX_AGGREGATION_JOB_SIZE reports in another batch. This should result in - // two aggregation jobs per overlapping collection job. (and there are two such collection jobs) + // two aggregation jobs per overlapping collection job. (and there are two such collection + // jobs) let report_time = report_time.sub(task.time_precision()).unwrap(); let batch_2_reports: Vec> = iter::repeat_with(|| LeaderStoredReport::new_dummy(*task.id(), report_time)) @@ -3484,13 +3485,16 @@ mod tests { // AggregationJob<_, _, A>::aggregation_parameter returns // &A::AggregationParam, but we nonetheless need this cast or the // compiler won't let us call clone - let agg_param = (agg_job.aggregation_parameter() as &A::AggregationParam).clone(); + let agg_param = + (agg_job.aggregation_parameter() as &A::AggregationParam).clone(); let want_ra_state = want_ra_states .get(&(*ra.report_id(), agg_param)) .unwrap_or_else(|| { panic!( - "found report aggregation for unknown report {} aggregation param {:?}", - ra.report_id(), agg_job.aggregation_parameter(), + "found report aggregation for unknown report {} \ + aggregation param {:?}", + ra.report_id(), + agg_job.aggregation_parameter(), ) }); assert_eq!(want_ra_state, ra.state()); diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index 5999347d6..96c5ed591 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -222,7 +222,8 @@ where }) .await?; - // Figure out the next step based on the non-error report aggregation states, and dispatch accordingly. + // Figure out the next step based on the non-error report aggregation states, and dispatch + // accordingly. let (mut saw_start, mut saw_waiting, mut saw_finished) = (false, false, false); for report_aggregation in &report_aggregations { match report_aggregation.state() { @@ -268,7 +269,7 @@ where _ => Err(Error::Internal(format!( "unexpected combination of report aggregation states (saw_start = {saw_start}, \ - saw_waiting = {saw_waiting}, saw_finished = {saw_finished})", + saw_waiting = {saw_waiting}, saw_finished = {saw_finished})", ))), } } @@ -326,8 +327,8 @@ where ); let _entered = span.enter(); - // Compute report shares to send to helper, and decrypt our input shares & initialize - // preparation state. + // Compute report shares to send to helper, and decrypt our input shares & + // initialize preparation state. let mut report_aggregations_to_write = Vec::new(); let mut prepare_inits = Vec::new(); let mut stepped_aggregations = Vec::new(); @@ -351,8 +352,8 @@ where helper_encrypted_input_share, ), - // Panic safety: this can't happen because we filter to only StartLeader-state - // report aggregations before this loop. + // Panic safety: this can't happen because we filter to only + // StartLeader-state report aggregations before this loop. _ => panic!( "Unexpected report aggregation state: {:?}", report_aggregation.state() @@ -494,8 +495,8 @@ where content_type: AggregationJobInitializeReq::::MEDIA_TYPE, body: Bytes::from(request.get_encoded().map_err(Error::MessageEncode)?), }), - // The only way a task wouldn't have an aggregator auth token in it is in the taskprov - // case, and Janus never acts as the leader with taskprov enabled. + // The only way a task wouldn't have an aggregator auth token in it is in the + // taskprov case, and Janus never acts as the leader with taskprov enabled. task.aggregator_auth_token().ok_or_else(|| { Error::InvalidConfiguration("no aggregator auth token in task") })?, @@ -741,7 +742,8 @@ where PrepareStepResult::Reject(err) => { // If the helper failed, we move to FAILED immediately. - // TODO(#236): is it correct to just record the transition error that the helper reports? + // TODO(#236): is it correct to just record the transition error that the helper + // reports? info!( report_id = %stepped_aggregation.report_aggregation.report_id(), helper_error = ?err, diff --git a/aggregator/src/aggregator/aggregation_job_writer.rs b/aggregator/src/aggregator/aggregation_job_writer.rs index 059919a0a..d20827dac 100644 --- a/aggregator/src/aggregator/aggregation_job_writer.rs +++ b/aggregator/src/aggregator/aggregation_job_writer.rs @@ -209,7 +209,8 @@ where // A> UPDATE batch_aggregations WHERE ord = 1 ... -- Row with ord 1 is locked for update. // B> BEGIN; // B> UPDATE batch_aggregations WHERE ord = 2 ... -- Row with ord 2 is locked for update. - // A> UPDATE batch_aggregations WHERE ord = 2 ... -- A is now blocked waiting for B to finish. + // A> UPDATE batch_aggregations WHERE ord = 2 ... -- A is now blocked waiting for B to + // finish. // B> UPDATE batch_aggregations WHERE ord = 1 ... -- Kaboom! // // To avoid this, we sort by `batch_identifier` and `ord`. diff --git a/aggregator/src/aggregator/collection_job_driver.rs b/aggregator/src/aggregator/collection_job_driver.rs index 37419a9fb..bdf8e213c 100644 --- a/aggregator/src/aggregator/collection_job_driver.rs +++ b/aggregator/src/aggregator/collection_job_driver.rs @@ -78,15 +78,15 @@ where /// Step the provided collection job, for which a lease should have been acquired (though this /// should be idempotent). If the collection job runs to completion, the leader share, helper /// share, report count and report ID checksum will be written to the `collection_jobs` table, - /// and a subsequent request to the collection job URI will yield the aggregate shares. The collect - /// job's lease is released, though it won't matter since the job will no longer be eligible to - /// be run. + /// and a subsequent request to the collection job URI will yield the aggregate shares. The + /// collect job's lease is released, though it won't matter since the job will no longer be + /// eligible to be run. /// /// If some error occurs (including a failure getting the helper's aggregate share), neither /// aggregate share is written to the datastore. A subsequent request to the collection job URI - /// will not yield a result. The collection job lease will eventually expire, allowing a later run - /// of the collection job driver to try again. Both aggregate shares will be recomputed at that - /// time. + /// will not yield a result. The collection job lease will eventually expire, allowing a later + /// run of the collection job driver to try again. Both aggregate shares will be recomputed + /// at that time. #[tracing::instrument(skip(self, datastore), err)] pub async fn step_collection_job( &self, @@ -1472,7 +1472,8 @@ mod tests { .await .unwrap(); - // Verify: check that the collection job was abandoned, and that it can no longer be acquired. + // Verify: check that the collection job was abandoned, and that it can no longer be + // acquired. let (abandoned_collection_job, leases) = ds .run_unnamed_tx(|tx| { let collection_job = collection_job.clone(); @@ -1759,7 +1760,8 @@ mod tests { mocked_aggregate_share.assert_async().await; - // Verify: check that the collection job was abandoned, and that it can no longer be acquired. + // Verify: check that the collection job was abandoned, and that it can no longer be + // acquired. ds.run_unnamed_tx(|tx| { let collection_job = collection_job.clone(); Box::pin(async move { @@ -1811,9 +1813,9 @@ mod tests { .as_secs(); assert_eq!( - want_delay_s, - got_delay_s, - "RetryDelay({min_delay_s}, {max_delay_s}, {exponential_factor}).compute_retry_delay({step_attempts})" + want_delay_s, got_delay_s, + "RetryDelay({min_delay_s}, {max_delay_s}, \ + {exponential_factor}).compute_retry_delay({step_attempts})" ); } diff --git a/aggregator/src/aggregator/collection_job_tests.rs b/aggregator/src/aggregator/collection_job_tests.rs index 681a0c25a..206df6d4e 100644 --- a/aggregator/src/aggregator/collection_job_tests.rs +++ b/aggregator/src/aggregator/collection_job_tests.rs @@ -277,7 +277,8 @@ async fn collection_job_success_fixed_size() { let test_conn = test_case.post_collection_job(&collection_job_id).await; assert_eq!(test_conn.status(), Some(Status::Accepted)); - // Update the collection job with the aggregate shares. collection job should now be complete. + // Update the collection job with the aggregate shares. collection job should now be + // complete. let batch_id = test_case .datastore .run_unnamed_tx(|tx| { diff --git a/aggregator/src/aggregator/error.rs b/aggregator/src/aggregator/error.rs index 4f1ec94c0..7f95c7aec 100644 --- a/aggregator/src/aggregator/error.rs +++ b/aggregator/src/aggregator/error.rs @@ -97,7 +97,8 @@ pub enum Error { #[error("{0}")] BatchMismatch(Box), /// Corresponds to `batchQueriedTooManyTimes` in DAP. A collect or aggregate share request was - /// rejected because the queries against a single batch exceed the task's `max_batch_query_count`. + /// rejected because the queries against a single batch exceed the task's + /// `max_batch_query_count`. #[error("task {0}: batch queried too many times ({1})")] BatchQueriedTooManyTimes(TaskId, u64), /// A collect or aggregate share request was rejected because the batch overlaps with a diff --git a/aggregator/src/aggregator/garbage_collector.rs b/aggregator/src/aggregator/garbage_collector.rs index 0c697ab80..e3a4d55d3 100644 --- a/aggregator/src/aggregator/garbage_collector.rs +++ b/aggregator/src/aggregator/garbage_collector.rs @@ -75,7 +75,8 @@ impl GarbageCollector { #[tracing::instrument(name = "GarbageCollector::run", skip(self))] pub async fn run(&self) -> Result<()> { - // TODO(#224): add support for handling only a subset of tasks in a single job (i.e. sharding). + // TODO(#224): add support for handling only a subset of tasks in a single job (i.e. + // sharding). // Retrieve tasks. let task_ids: Vec<_> = self diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index 905fb4dae..d244ab667 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -26,8 +26,7 @@ use opentelemetry::{ use prio::codec::Encode; use ring::digest::{digest, SHA256}; use serde::Deserialize; -use std::{borrow::Cow, time::Duration as StdDuration}; -use std::{io::Cursor, sync::Arc}; +use std::{borrow::Cow, io::Cursor, sync::Arc, time::Duration as StdDuration}; use tracing::warn; use trillium::{Conn, Handler, KnownHeaderName, Status}; use trillium_api::{api, State}; @@ -227,7 +226,8 @@ impl StatusCounter { meter .u64_counter("janus_aggregator_responses") .with_description( - "Count of requests handled by the aggregator, by method, route, and response status.", + "Count of requests handled by the aggregator, by method, route, and response \ + status.", ) .with_unit(Unit::new("{request}")) .init(), diff --git a/aggregator/src/aggregator/http_handlers/tests/helper_e2e.rs b/aggregator/src/aggregator/http_handlers/tests/helper_e2e.rs index eb1409eaf..12656a8a1 100644 --- a/aggregator/src/aggregator/http_handlers/tests/helper_e2e.rs +++ b/aggregator/src/aggregator/http_handlers/tests/helper_e2e.rs @@ -1,3 +1,10 @@ +use crate::aggregator::{ + aggregate_init_tests::{put_aggregation_job, PrepareInitGenerator}, + http_handlers::{ + test_util::{setup_http_handler_test, take_response_body}, + tests::aggregate_share::post_aggregate_share_request, + }, +}; use assert_matches::assert_matches; use janus_aggregator_core::task::{test_util::TaskBuilder, QueryType}; use janus_core::{report_id::ReportIdChecksumExt, vdaf::VdafInstance}; @@ -12,14 +19,6 @@ use prio::{ use rand::random; use trillium_testing::assert_status; -use crate::aggregator::{ - aggregate_init_tests::{put_aggregation_job, PrepareInitGenerator}, - http_handlers::{ - test_util::{setup_http_handler_test, take_response_body}, - tests::aggregate_share::post_aggregate_share_request, - }, -}; - /// Send multiple aggregation job requests and aggregate share requests for a negative test that /// reports cannot be aggregated with the same aggregation parameter into multiple batches. #[tokio::test] diff --git a/aggregator/src/aggregator/report_writer.rs b/aggregator/src/aggregator/report_writer.rs index 8727cba00..0db0e3b2c 100644 --- a/aggregator/src/aggregator/report_writer.rs +++ b/aggregator/src/aggregator/report_writer.rs @@ -214,7 +214,8 @@ impl ReportWriteBatcher { if let Some(result_tx) = result_tx { if result_tx.send(result.map_err(Arc::new)).is_err() { debug!( - "ReportWriter couldn't send result to requester (request cancelled?)" + "ReportWriter couldn't send result to requester (request \ + cancelled?)" ); } } @@ -319,8 +320,8 @@ where } } -/// A collection of [`TaskUploadCounter`]s, grouped by [`TaskId`]. It can be cloned to share it across -/// futures. +/// A collection of [`TaskUploadCounter`]s, grouped by [`TaskId`]. It can be cloned to share it +/// across futures. #[derive(Debug, Default, Clone)] pub struct TaskUploadCounters(Arc>>); diff --git a/aggregator/src/binaries/aggregation_job_creator.rs b/aggregator/src/binaries/aggregation_job_creator.rs index 3f3a593a7..da7fe39b4 100644 --- a/aggregator/src/binaries/aggregation_job_creator.rs +++ b/aggregator/src/binaries/aggregation_job_creator.rs @@ -7,8 +7,7 @@ use anyhow::Result; use clap::Parser; use janus_core::time::RealClock; use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; pub async fn main_callback(ctx: BinaryContext) -> Result<()> { // Start creating aggregation jobs. diff --git a/aggregator/src/binaries/aggregator.rs b/aggregator/src/binaries/aggregator.rs index 69128aa2e..cfa555f6e 100644 --- a/aggregator/src/binaries/aggregator.rs +++ b/aggregator/src/binaries/aggregator.rs @@ -23,9 +23,12 @@ use sec1::EcPrivateKey; use serde::{de, Deserialize, Deserializer, Serialize}; use std::{ future::{ready, Future}, + iter::Iterator, + net::SocketAddr, pin::Pin, + sync::Arc, + time::Duration, }; -use std::{iter::Iterator, net::SocketAddr, sync::Arc, time::Duration}; use tokio::{join, sync::watch}; use tracing::info; use trillium::Handler; @@ -339,10 +342,10 @@ pub struct Config { #[serde(default = "default_task_counter_shard_count")] pub task_counter_shard_count: u64, - /// Defines how often to refresh the global HPKE configs cache in milliseconds. This affects how - /// often an aggregator becomes aware of key state changes. If unspecified, default is defined - /// by [`GlobalHpkeKeypairCache::DEFAULT_REFRESH_INTERVAL`]. You shouldn't normally have to - /// specify this. + /// Defines how often to refresh the global HPKE configs cache in milliseconds. This affects + /// how often an aggregator becomes aware of key state changes. If unspecified, default is + /// defined by [`GlobalHpkeKeypairCache::DEFAULT_REFRESH_INTERVAL`]. You shouldn't normally + /// have to specify this. #[serde(default)] pub global_hpke_configs_refresh_interval: Option, @@ -368,8 +371,8 @@ pub struct GarbageCollectorConfig { /// How frequently garbage collection is run, in seconds. pub gc_frequency_s: u64, - /// The limit to the number of client report artifacts deleted for a single task by a single run - /// of the garbage collector. + /// The limit to the number of client report artifacts deleted for a single task by a single + /// run of the garbage collector. pub report_limit: u64, /// The limit to the number of aggregation jobs, and related aggregation artifacts, deleted for diff --git a/aggregator/src/binaries/garbage_collector.rs b/aggregator/src/binaries/garbage_collector.rs index 58f209434..c0cc78510 100644 --- a/aggregator/src/binaries/garbage_collector.rs +++ b/aggregator/src/binaries/garbage_collector.rs @@ -1,23 +1,20 @@ -use std::{sync::Arc, time::Duration}; - +use super::aggregator::GarbageCollectorConfig; +use crate::{ + aggregator::garbage_collector::GarbageCollector, + binary_utils::{BinaryContext, BinaryOptions, CommonBinaryOptions}, + config::{BinaryConfig, CommonConfig}, +}; use anyhow::Result; use clap::Parser; use janus_aggregator_core::datastore::Datastore; use janus_core::time::RealClock; use opentelemetry::metrics::Meter; use serde::{Deserialize, Serialize}; +use std::{sync::Arc, time::Duration}; use tokio::time::interval; use tracing::error; use trillium_tokio::Stopper; -use crate::{ - aggregator::garbage_collector::GarbageCollector, - binary_utils::{BinaryContext, BinaryOptions, CommonBinaryOptions}, - config::{BinaryConfig, CommonConfig}, -}; - -use super::aggregator::GarbageCollectorConfig; - pub async fn main_callback(ctx: BinaryContext) -> Result<()> { let BinaryContext { config, @@ -116,11 +113,7 @@ impl BinaryConfig for Config { #[cfg(test)] mod tests { - use std::net::{Ipv4Addr, SocketAddr}; - - use clap::CommandFactory; - use janus_core::test_util::roundtrip_encoding; - + use super::{Config, Options}; use crate::{ binaries::aggregator::GarbageCollectorConfig, config::{ @@ -129,8 +122,9 @@ mod tests { CommonConfig, }, }; - - use super::{Config, Options}; + use clap::CommandFactory; + use janus_core::test_util::roundtrip_encoding; + use std::net::{Ipv4Addr, SocketAddr}; #[test] fn verify_app() { diff --git a/aggregator/src/binaries/janus_cli.rs b/aggregator/src/binaries/janus_cli.rs index 7ae878e69..8ac0c656a 100644 --- a/aggregator/src/binaries/janus_cli.rs +++ b/aggregator/src/binaries/janus_cli.rs @@ -133,8 +133,8 @@ enum Command { #[arg(long, env = "VERIFY_KEY_INIT", hide_env_values = true)] verify_key_init: VerifyKeyInit, - /// The location of the collector HPKE config file, which contains an encoded DAP HpkeConfig - /// (i.e. public key & metadata) used to encrypt to the collector. + /// The location of the collector HPKE config file, which contains an encoded DAP + /// HpkeConfig (i.e. public key & metadata) used to encrypt to the collector. #[arg(long)] collector_hpke_config_file: PathBuf, @@ -654,9 +654,9 @@ struct KubernetesSecretOptions { } impl KubernetesSecretOptions { - /// Fetch the datastore keys from the options. If --secrets-k8s-namespace is set, keys are fetched - /// from a secret therein. Otherwise, returns the keys provided to --datastore-keys. If neither was - /// set, returns an error. + /// Fetch the datastore keys from the options. If --secrets-k8s-namespace is set, keys are + /// fetched from a secret therein. Otherwise, returns the keys provided to --datastore-keys. + /// If neither was set, returns an error. async fn datastore_keys( &self, options: &CommonBinaryOptions, diff --git a/aggregator/src/binary_utils.rs b/aggregator/src/binary_utils.rs index 39a3284e3..338f33ec9 100644 --- a/aggregator/src/binary_utils.rs +++ b/aggregator/src/binary_utils.rs @@ -279,8 +279,8 @@ where { return Err(anyhow!( "Tokio runtime metrics were enabled in the configuration file, but support \ - was not enabled at compile time. Rebuild with \ - `RUSTFLAGS=\"--cfg tokio_unstable\"`." + was not enabled at compile time. Rebuild with `RUSTFLAGS=\"--cfg \ + tokio_unstable\"`." )); } } @@ -662,8 +662,8 @@ mod tests { // a host directory in the container would be insufficient, because the files' owner UIDs // will not match the postgres user's UID. Instead, we create a temporary Docker volume, run // a setup container with both the volume and a host directory mounted in, copy the - // certificate and key into the volume, and fix up their ownership (and permissions, in - // case those were lost on a non-POSIX host). Then, we run a second container with the volume + // certificate and key into the volume, and fix up their ownership (and permissions, in case + // those were lost on a non-POSIX host). Then, we run a second container with the volume // mounted in, and use the fixed files in the volume in database configuration. let volume = Volume::new(); let setup_image = RunnableImage::from(( diff --git a/aggregator/src/binary_utils/job_driver.rs b/aggregator/src/binary_utils/job_driver.rs index cb861c6ca..a39422fa0 100644 --- a/aggregator/src/binary_utils/job_driver.rs +++ b/aggregator/src/binary_utils/job_driver.rs @@ -142,7 +142,8 @@ where break; } - // Wait until we are able to start at least one worker. (permit will be immediately released) + // Wait until we are able to start at least one worker. (permit will be immediately + // released) // // Unwrap safety: Semaphore::acquire is documented as only returning an error if the // semaphore is closed, and we never close this semaphore. @@ -192,8 +193,8 @@ where &[KeyValue::new("status", "error")], ); - // Go ahead and provide a delay in this error case to ensure we don't tightly loop - // running transactions that will fail without any delay. + // Go ahead and provide a delay in this error case to ensure we don't tightly + // loop running transactions that will fail without any delay. next_run_instant += self.job_discovery_interval; error!(?error, "Couldn't acquire jobs"); continue; @@ -383,8 +384,9 @@ mod tests { let incomplete_jobs = incomplete_jobs .get(test_state.job_acquire_counter) - // Clone here so that incomplete_jobs will be Vec<_> and not &Vec<_>, which - // would be impossible to return from Option::unwrap_or_default. + // Clone here so that incomplete_jobs will be Vec<_> and not + // &Vec<_>, which would be impossible to return from + // Option::unwrap_or_default. .cloned() .unwrap_or_default(); diff --git a/aggregator/src/cache.rs b/aggregator/src/cache.rs index b457d277d..d0c53c205 100644 --- a/aggregator/src/cache.rs +++ b/aggregator/src/cache.rs @@ -282,8 +282,7 @@ impl TaskAggregatorCache { #[cfg(test)] mod tests { - use std::{sync::Arc, time::Duration}; - + use crate::{aggregator::report_writer::ReportWriteBatcher, cache::TaskAggregatorCache}; use janus_aggregator_core::{ datastore::test_util::ephemeral_datastore, task::{test_util::TaskBuilder, QueryType}, @@ -294,10 +293,9 @@ mod tests { vdaf::VdafInstance, }; use janus_messages::Time; + use std::{sync::Arc, time::Duration}; use tokio::time::sleep; - use crate::{aggregator::report_writer::ReportWriteBatcher, cache::TaskAggregatorCache}; - #[tokio::test] async fn task_aggregator_cache() { install_test_trace_subscriber(); @@ -356,8 +354,8 @@ mod tests { task.task_expiration() ); - // Unfortunately, because moka doesn't provide any facility for a fake clock, we have to resort - // to sleeps to test TTL functionality. + // Unfortunately, because moka doesn't provide any facility for a fake clock, we have to + // resort to sleeps to test TTL functionality. sleep(Duration::from_secs(1)).await; let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap(); @@ -402,8 +400,8 @@ mod tests { // We shouldn't see the new task yet. assert!(task_aggregators.get(task.id()).await.unwrap().is_none()); - // Unfortunately, because moka doesn't provide any facility for a fake clock, we have to resort - // to sleeps to test TTL functionality. + // Unfortunately, because moka doesn't provide any facility for a fake clock, we have to + // resort to sleeps to test TTL functionality. sleep(Duration::from_secs(1)).await; // Now we should see it. diff --git a/aggregator/src/config.rs b/aggregator/src/config.rs index eaa687028..d8a6795e9 100644 --- a/aggregator/src/config.rs +++ b/aggregator/src/config.rs @@ -44,9 +44,9 @@ pub struct CommonConfig { #[serde(default = "default_health_check_listen_address")] pub health_check_listen_address: SocketAddr, - /// The maximum number of times a transaction can be retried. The intent is to guard against bugs - /// that induce infinite retries. It should be set to a reasonably high limit to prevent legitimate - /// work from being cancelled. + /// The maximum number of times a transaction can be retried. The intent is to guard against + /// bugs that induce infinite retries. It should be set to a reasonably high limit to + /// prevent legitimate work from being cancelled. #[serde(default = "default_max_transaction_retries")] pub max_transaction_retries: u64, } @@ -176,13 +176,13 @@ pub struct JobDriverConfig { /// The maximum number of jobs being stepped at once. This parameter determines the amount of /// per-process concurrency. pub max_concurrent_job_workers: usize, - /// The length of time, in seconds, workers will acquire a lease for the jobs they are stepping. - /// Along with worker_lease_clock_skew_allowance, determines the effective timeout of stepping a - /// single job. + /// The length of time, in seconds, workers will acquire a lease for the jobs they are + /// stepping. Along with worker_lease_clock_skew_allowance, determines the effective + /// timeout of stepping a single job. pub worker_lease_duration_secs: u64, /// The length of time, in seconds, workers decrease their timeouts from the lease length in - /// order to guard against the possibility of clock skew. Along with worker_lease_duration_secs, - /// determines the effective timeout of stepping a single job. + /// order to guard against the possibility of clock skew. Along with + /// worker_lease_duration_secs, determines the effective timeout of stepping a single job. pub worker_lease_clock_skew_allowance_secs: u64, /// The number of attempts to drive a work item before it is placed in a permanent failure /// state. diff --git a/aggregator/src/metrics.rs b/aggregator/src/metrics.rs index 68bfa6863..7233140c6 100644 --- a/aggregator/src/metrics.rs +++ b/aggregator/src/metrics.rs @@ -8,7 +8,18 @@ use opentelemetry::{ use serde::{Deserialize, Serialize}; use std::net::AddrParseError; use tokio::runtime::Runtime; - +#[cfg(any(feature = "otlp", feature = "prometheus"))] +use { + crate::git_revision, + janus_aggregator_core::datastore::TRANSACTION_RETRIES_METER_NAME, + opentelemetry::metrics::MetricsError, + opentelemetry_sdk::{ + metrics::{ + new_view, Aggregation, Instrument, InstrumentKind, SdkMeterProvider, Stream, View, + }, + Resource, + }, +}; #[cfg(feature = "prometheus")] use { anyhow::Context, @@ -21,7 +32,6 @@ use { tokio::{sync::oneshot, task::JoinHandle}, trillium::{Info, Init}, }; - #[cfg(feature = "otlp")] use { opentelemetry_otlp::WithExportConfig, @@ -34,19 +44,6 @@ use { }, }; -#[cfg(any(feature = "otlp", feature = "prometheus"))] -use { - crate::git_revision, - janus_aggregator_core::datastore::TRANSACTION_RETRIES_METER_NAME, - opentelemetry::metrics::MetricsError, - opentelemetry_sdk::{ - metrics::{ - new_view, Aggregation, Instrument, InstrumentKind, SdkMeterProvider, Stream, View, - }, - Resource, - }, -}; - #[cfg(all(tokio_unstable, feature = "prometheus"))] pub(crate) mod tokio_runtime; diff --git a/aggregator/src/metrics/tokio_runtime.rs b/aggregator/src/metrics/tokio_runtime.rs index ef631d568..d341b38a3 100644 --- a/aggregator/src/metrics/tokio_runtime.rs +++ b/aggregator/src/metrics/tokio_runtime.rs @@ -1,5 +1,4 @@ -use std::time::{Duration, SystemTime}; - +use crate::metrics::{HistogramScale as ConfigHistogramScale, TokioMetricsConfiguration}; use derivative::Derivative; use opentelemetry::{ metrics::{MetricsError, Unit}, @@ -14,10 +13,9 @@ use opentelemetry_sdk::{ }, AttributeSet, }; +use std::time::{Duration, SystemTime}; use tokio::runtime::{self, HistogramScale, RuntimeMetrics}; -use crate::metrics::{HistogramScale as ConfigHistogramScale, TokioMetricsConfiguration}; - impl From for HistogramScale { fn from(value: ConfigHistogramScale) -> Self { match value { diff --git a/aggregator/src/trace.rs b/aggregator/src/trace.rs index c6435db64..608051d91 100644 --- a/aggregator/src/trace.rs +++ b/aggregator/src/trace.rs @@ -1,5 +1,7 @@ //! Configures a tracing subscriber for Janus. +#[cfg(feature = "otlp")] +use opentelemetry_otlp::WithExportConfig; use serde::{Deserialize, Serialize}; use std::{ io::{stdout, IsTerminal}, @@ -12,9 +14,6 @@ use tracing_subscriber::{ filter::FromEnvError, layer::SubscriberExt, reload, EnvFilter, Layer, Registry, }; -#[cfg(feature = "otlp")] -use opentelemetry_otlp::WithExportConfig; - /// Errors from initializing trace subscriber. #[derive(Debug, thiserror::Error)] pub enum Error { diff --git a/aggregator_api/src/routes.rs b/aggregator_api/src/routes.rs index 917bda8e3..0ea23ede3 100644 --- a/aggregator_api/src/routes.rs +++ b/aggregator_api/src/routes.rs @@ -17,10 +17,9 @@ use janus_aggregator_core::{ use janus_core::{ auth_tokens::AuthenticationTokenHash, hpke::generate_hpke_config_and_private_key, time::Clock, }; -use janus_messages::HpkeConfigId; use janus_messages::{ - query_type::Code as SupportedQueryType, Duration, HpkeAeadId, HpkeKdfId, HpkeKemId, Role, - TaskId, + query_type::Code as SupportedQueryType, Duration, HpkeAeadId, HpkeConfigId, HpkeKdfId, + HpkeKemId, Role, TaskId, }; use querystring::querify; use rand::random; @@ -184,23 +183,26 @@ pub(super) async fn post_task( let task = Arc::clone(&task); Box::pin(async move { if let Some(existing_task) = tx.get_aggregator_task(task.id()).await? { - // Check whether the existing task in the DB corresponds to the incoming task, ignoring - // those fields that are randomly generated. - if existing_task.peer_aggregator_endpoint() == task.peer_aggregator_endpoint() - && existing_task.query_type() == task.query_type() - && existing_task.vdaf() == task.vdaf() - && existing_task.opaque_vdaf_verify_key() == task.opaque_vdaf_verify_key() - && existing_task.role() == task.role() - && existing_task.max_batch_query_count() == task.max_batch_query_count() - && existing_task.task_expiration() == task.task_expiration() - && existing_task.min_batch_size() == task.min_batch_size() - && existing_task.time_precision() == task.time_precision() - && existing_task.collector_hpke_config() == task.collector_hpke_config() { - return Ok(()) + // Check whether the existing task in the DB corresponds to the incoming task, + // ignoring those fields that are randomly generated. + if existing_task.peer_aggregator_endpoint() == task.peer_aggregator_endpoint() + && existing_task.query_type() == task.query_type() + && existing_task.vdaf() == task.vdaf() + && existing_task.opaque_vdaf_verify_key() == task.opaque_vdaf_verify_key() + && existing_task.role() == task.role() + && existing_task.max_batch_query_count() == task.max_batch_query_count() + && existing_task.task_expiration() == task.task_expiration() + && existing_task.min_batch_size() == task.min_batch_size() + && existing_task.time_precision() == task.time_precision() + && existing_task.collector_hpke_config() == task.collector_hpke_config() + { + return Ok(()); } let err = Error::Conflict( - "task with same VDAF verify key and task ID already exists with different parameters".to_string(), + "task with same VDAF verify key and task ID already exists with different \ + parameters" + .to_string(), ); return Err(datastore::Error::User(err.into())); } diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 8302f6f56..6c1e7642d 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -444,7 +444,8 @@ pub struct Transaction<'a, C: Clock> { task_infos: Arc>>, retry: AtomicBool, - op_group: Mutex>>, // locking discipline: outer lock before inner lock + op_group: Mutex>>, /* locking discipline: outer lock before inner + * lock */ } enum OperationGroup { @@ -2447,14 +2448,15 @@ WHERE report_aggregations.task_id = $1 .to_string(), ) })?; - let helper_encrypted_input_share_bytes = - row.get::<_, Option>>("helper_encrypted_input_share") - .ok_or_else(|| { - Error::DbState( - "report aggregation in state START but helper_encrypted_input_share is NULL" + let helper_encrypted_input_share_bytes = row + .get::<_, Option>>("helper_encrypted_input_share") + .ok_or_else(|| { + Error::DbState( + "report aggregation in state START but helper_encrypted_input_share \ + is NULL" .to_string(), ) - })?; + })?; let public_share = A::PublicShare::get_decoded_with_param(vdaf, &public_share_bytes)?; @@ -2482,7 +2484,8 @@ WHERE report_aggregations.task_id = $1 .get::<_, Option>>("leader_prep_transition") .ok_or_else(|| { Error::DbState( - "report aggregation in state WAITING but leader_prep_transition is NULL" + "report aggregation in state WAITING but \ + leader_prep_transition is NULL" .to_string(), ) })?; @@ -2500,7 +2503,8 @@ WHERE report_aggregations.task_id = $1 .get::<_, Option>>("helper_prep_state") .ok_or_else(|| { Error::DbState( - "report aggregation in state WAITING but helper_prep_state is NULL" + "report aggregation in state WAITING but helper_prep_state is \ + NULL" .to_string(), ) })?; @@ -3219,9 +3223,14 @@ WHERE task_id = $1 ) })?)?; let client_timestamp_interval = client_timestamp_interval - .ok_or_else(|| Error::DbState( - "collection job in state FINISHED but client_timestamp_interval is NULL".to_string()) - )?.as_interval(); + .ok_or_else(|| { + Error::DbState( + "collection job in state FINISHED but client_timestamp_interval is \ + NULL" + .to_string(), + ) + })? + .as_interval(); let encrypted_helper_aggregate_share = HpkeCiphertext::get_decoded( &helper_aggregate_share_bytes.ok_or_else(|| { Error::DbState( @@ -4471,7 +4480,8 @@ ON CONFLICT(task_id, batch_identifier, aggregation_param) DO UPDATE // Note that this ignores aggregation parameter, as `outstanding_batches` does not need to // worry about aggregation parameters. // - // TODO(#225): reevaluate whether we can ignore aggregation parameter here once we have experience with VDAFs requiring multiple aggregations per batch. + // TODO(#225): reevaluate whether we can ignore aggregation parameter here once we have + // experience with VDAFs requiring multiple aggregations per batch. let stmt = self .prepare_cached( "-- put_outstanding_batch() @@ -5451,9 +5461,9 @@ GROUP BY tasks.id", .transpose() } - /// Add a `TaskUploadCounter` to the counter associated with the given [`TaskId`]. This is sharded, - /// requiring an `ord` parameter to determine which shard to add to. `ord` should be randomly - /// generated by the caller. + /// Add a `TaskUploadCounter` to the counter associated with the given [`TaskId`]. This is + /// sharded, requiring an `ord` parameter to determine which shard to add to. `ord` should + /// be randomly generated by the caller. #[tracing::instrument(skip(self), err(level = Level::DEBUG))] pub async fn increment_task_upload_counter( &self, @@ -5461,11 +5471,10 @@ GROUP BY tasks.id", ord: u64, counter: &TaskUploadCounter, ) -> Result<(), Error> { - let stmt = - "-- increment_task_upload_counter() + let stmt = "-- increment_task_upload_counter() INSERT INTO task_upload_counters (task_id, ord, interval_collected, report_decode_failure, - report_decrypt_failure, report_expired, report_outdated_key, report_success, report_too_early, - task_expired) + report_decrypt_failure, report_expired, report_outdated_key, report_success, + report_too_early, task_expired) VALUES ((SELECT id FROM tasks WHERE task_id = $1), $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (task_id, ord) DO UPDATE SET interval_collected = task_upload_counters.interval_collected + $3, diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index 7b890a44f..61a7aa2df 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -1725,8 +1725,8 @@ impl> &self.state } - /// Returns a new [`CollectionJob`] corresponding to this collection job updated to have the given - /// state. + /// Returns a new [`CollectionJob`] corresponding to this collection job updated to have the + /// given state. pub fn with_state(self, state: CollectionJobState) -> Self { Self { state, ..self } } @@ -2090,8 +2090,8 @@ impl<'a> FromSql<'a> for SqlInterval { } Range::Nonempty(RangeBound::Exclusive(_), _) | Range::Nonempty(_, RangeBound::Inclusive(_)) => Err(Into::into( - "Interval can only represent timestamp ranges that are closed at the start \ - and open at the end", + "Interval can only represent timestamp ranges that are closed at the start and \ + open at the end", )), Range::Nonempty( RangeBound::Inclusive(Some(start_raw)), @@ -2108,8 +2108,7 @@ impl<'a> FromSql<'a> for SqlInterval { let abs_start_duration = Duration::from_microseconds(abs_start_us); let time = if negative { SQL_EPOCH_TIME.sub(&abs_start_duration).map_err(|_| { - "Interval cannot represent timestamp ranges starting before the Unix \ - epoch" + "Interval cannot represent timestamp ranges starting before the Unix epoch" })? } else { SQL_EPOCH_TIME diff --git a/aggregator_core/src/datastore/test_util.rs b/aggregator_core/src/datastore/test_util.rs index 1d1b3d9b0..75b36b969 100644 --- a/aggregator_core/src/datastore/test_util.rs +++ b/aggregator_core/src/datastore/test_util.rs @@ -1,3 +1,4 @@ +use super::SUPPORTED_SCHEMA_VERSIONS; use crate::{ datastore::{Crypter, Datastore, Transaction}, test_util::noop_meter, @@ -27,8 +28,6 @@ use tokio::sync::Mutex; use tokio_postgres::{connect, Config, NoTls}; use tracing::trace; -use super::SUPPORTED_SCHEMA_VERSIONS; - struct EphemeralDatabase { _db_container: ContainerAsync, port_number: u16, @@ -231,8 +230,8 @@ impl EphemeralDatastoreBuilder { // Create Postgres DB. // - // Since this is the first connection we're establishing since the container has been created, - // retry this a few times. The database may not be ready yet. + // Since this is the first connection we're establishing since the container has been + // created, retry this a few times. The database may not be ready yet. let backoff = ExponentialBackoffBuilder::new() .with_initial_interval(Duration::from_millis(500)) .with_max_interval(Duration::from_millis(500)) diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index 642437b4d..d037c82fd 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -1,3 +1,6 @@ +// This function is only used when there are multiple supported versions. +#[allow(unused_imports)] +use crate::datastore::test_util::ephemeral_datastore_schema_version_by_downgrade; use crate::{ datastore::{ models::{ @@ -64,10 +67,6 @@ use std::{ use tokio::{time::timeout, try_join}; use url::Url; -// This function is only used when there are multiple supported versions. -#[allow(unused_imports)] -use crate::datastore::test_util::ephemeral_datastore_schema_version_by_downgrade; - const OLDEST_ALLOWED_REPORT_TIMESTAMP: Time = Time::from_seconds_since_epoch(1000); const REPORT_EXPIRY_AGE: Duration = Duration::from_seconds(1000); @@ -3717,8 +3716,8 @@ async fn time_interval_collection_job_acquire_release_happy_path( .run_tx("test-acquire-leases", |tx| { let collection_job_leases = collection_job_leases.clone(); Box::pin(async move { - // Try to re-acquire collection jobs. Nothing should happen because the lease is still - // valid. + // Try to re-acquire collection jobs. Nothing should happen because the lease is + // still valid. assert!(tx .acquire_incomplete_collection_jobs(&StdDuration::from_secs(100), 10) .await @@ -3903,8 +3902,8 @@ async fn fixed_size_collection_job_acquire_release_happy_path( .run_unnamed_tx(|tx| { let collection_job_leases = collection_job_leases.clone(); Box::pin(async move { - // Try to re-acquire collection jobs. Nothing should happen because the lease is still - // valid. + // Try to re-acquire collection jobs. Nothing should happen because the lease is + // still valid. assert!(tx .acquire_incomplete_collection_jobs(&StdDuration::from_secs(100), 10,) .await @@ -6487,7 +6486,8 @@ async fn delete_expired_collection_artifacts(ephemeral_datastore: EphemeralDatas ) .await; - // Leader, time-interval collection artifacts with old & new reports. [collection job GC'ed, remainder not GC'ed] + // Leader, time-interval collection artifacts with old & new reports. [collection + // job GC'ed, remainder not GC'ed] let ( _, aggregate_share_job_id, @@ -6556,7 +6556,8 @@ async fn delete_expired_collection_artifacts(ephemeral_datastore: EphemeralDatas ) .await; - // Helper, time-interval collection artifacts with old & new reports. [aggregate share job job GC'ed, remainder not GC'ed] + // Helper, time-interval collection artifacts with old & new reports. [aggregate + // share job job GC'ed, remainder not GC'ed] let (_, _, batch_id, outstanding_batch_id, batch_aggregation_id, _) = write_collect_artifacts::( tx, diff --git a/aggregator_core/src/lib.rs b/aggregator_core/src/lib.rs index ad155697e..169de95cd 100644 --- a/aggregator_core/src/lib.rs +++ b/aggregator_core/src/lib.rs @@ -6,15 +6,14 @@ #![allow(clippy::single_component_path_imports)] use derivative::Derivative; -use tracing::{debug, info_span, Instrument, Span}; -use trillium::{Conn, Handler, Status}; -use trillium_macros::Handler; -use trillium_router::RouterConnExt; - // We must import `rstest_reuse` at the top of the crate // https://docs.rs/rstest_reuse/0.5.0/rstest_reuse/#use-rstest_reuse-at-the-top-of-your-crate #[cfg(test)] use rstest_reuse; +use tracing::{debug, info_span, Instrument, Span}; +use trillium::{Conn, Handler, Status}; +use trillium_macros::Handler; +use trillium_router::RouterConnExt; pub mod datastore; pub mod query_type; diff --git a/aggregator_core/src/task.rs b/aggregator_core/src/task.rs index 9c5a1ba40..1891c42c4 100644 --- a/aggregator_core/src/task.rs +++ b/aggregator_core/src/task.rs @@ -38,8 +38,8 @@ pub enum QueryType { TimeInterval, /// Fixed-size: used to support collection of batches as quickly as possible, without the - /// latency of waiting for batch time intervals to pass, and with direct control over the number - /// of reports per batch. + /// latency of waiting for batch time intervals to pass, and with direct control over the + /// number of reports per batch. FixedSize { /// If present, the maximum number of reports in a batch to allow it to be collected. If /// absent, then there is no limit to the number of reports that Janus will include in a @@ -819,8 +819,8 @@ pub mod test_util { helper_aggregator_endpoint: Url, /// HPKE configuration and private key used by the collector to decrypt aggregate shares. collector_hpke_keypair: HpkeKeypair, - /// Token used to authenticate messages exchanged between the aggregators in the aggregation - /// sub-protocol. + /// Token used to authenticate messages exchanged between the aggregators in the + /// aggregation sub-protocol. aggregator_auth_token: AuthenticationToken, /// Token used to authenticate messages exchanged between the collector and leader in the /// collection sub-protocol. diff --git a/aggregator_core/src/taskprov.rs b/aggregator_core/src/taskprov.rs index 0e8e991a5..ea34b683b 100644 --- a/aggregator_core/src/taskprov.rs +++ b/aggregator_core/src/taskprov.rs @@ -100,8 +100,8 @@ pub struct PeerAggregator { #[derivative(Debug(format_with = "std::fmt::Display::fmt"))] endpoint: Url, - /// The role that the peer aggregator takes in DAP. Must be [`Role::Leader`] or [`Role::Helper`]. - /// This, along with `endpoint`, uniquely represents the peer aggregator. + /// The role that the peer aggregator takes in DAP. Must be [`Role::Leader`] or + /// [`Role::Helper`]. This, along with `endpoint`, uniquely represents the peer aggregator. role: Role, /// The preshared key used to derive the VDAF verify key for each task. @@ -115,8 +115,8 @@ pub struct PeerAggregator { /// copied into the definition for a provisioned task. report_expiry_age: Option, - /// The maximum allowable clock skew between peers. This value is copied into the definition for - /// a provisioned task. + /// The maximum allowable clock skew between peers. This value is copied into the definition + /// for a provisioned task. tolerable_clock_skew: Duration, /// Auth tokens used for authenticating Leader to Helper requests. @@ -275,6 +275,7 @@ impl KeyType for VdafVerifyKeyLength { #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod test_util { + use super::{PeerAggregator, VerifyKeyInit}; use janus_core::{ auth_tokens::AuthenticationToken, hpke::test_util::generate_test_hpke_config_and_private_key, @@ -283,8 +284,6 @@ pub mod test_util { use rand::random; use url::Url; - use super::{PeerAggregator, VerifyKeyInit}; - #[derive(Debug, Clone)] pub struct PeerAggregatorBuilder(PeerAggregator); diff --git a/collector/src/credential.rs b/collector/src/credential.rs index 048227b60..a589959dc 100644 --- a/collector/src/credential.rs +++ b/collector/src/credential.rs @@ -22,7 +22,8 @@ pub struct PrivateCollectorCredential { } impl PrivateCollectorCredential { - /// Returns the [`AuthenticationToken`] necessary for connecting to an aggregator for collection. + /// Returns the [`AuthenticationToken`] necessary for connecting to an aggregator for + /// collection. pub fn authentication_token(&self) -> AuthenticationToken { AuthenticationToken::Bearer(self.token.clone()) } diff --git a/collector/src/lib.rs b/collector/src/lib.rs index 1a1523c94..2e6889a49 100644 --- a/collector/src/lib.rs +++ b/collector/src/lib.rs @@ -402,8 +402,8 @@ pub struct Collector { } impl Collector { - /// Construct a new collector. This requires certain DAP task parameters and an implementation of - /// the task's VDAF. + /// Construct a new collector. This requires certain DAP task parameters and an implementation + /// of the task's VDAF. pub fn new( task_id: TaskId, leader_endpoint: Url, diff --git a/core/src/hpke.rs b/core/src/hpke.rs index f4cbaafab..57d660a10 100644 --- a/core/src/hpke.rs +++ b/core/src/hpke.rs @@ -508,7 +508,8 @@ mod tests { // See https://github.com/cfrg/draft-irtf-cfrg-hpke/blob/5f503c564da00b0687b3de75f1dfbdfc4079ad31/test-vectors.json // // The file was processed with the following command: - // jq 'map({mode, kem_id, kdf_id, aead_id, info, enc, pkRm, skRm, base_nonce, encryptions: [.encryptions[0]]} | select(.mode == 0) | select(.aead_id != 65535))' + // jq 'map({mode, kem_id, kdf_id, aead_id, info, enc, pkRm, skRm, base_nonce, encryptions: + // [.encryptions[0]]} | select(.mode == 0) | select(.aead_id != 65535))' let test_vectors: Vec = serde_json::from_str(include_str!("test-vectors.json")).unwrap(); let mut algorithms_tested = HashSet::new(); diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 339ea02c6..2c998de55 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -71,8 +71,8 @@ impl AggregatorEndpointFragments { Url::parse(&format!("http://{host}:8080{path}")).unwrap() } AggregatorEndpointFragments::Localhost { .. } => panic!( - "cannot combine an aggregator running on localhost with a client or leader running \ - in a virtual network" + "cannot combine an aggregator running on localhost with a client or leader \ + running in a virtual network" ), AggregatorEndpointFragments::Remote { .. } => { panic!("Cannot connect to remote aggregator on virtual network") diff --git a/integration_tests/tests/integration/common.rs b/integration_tests/tests/integration/common.rs index 015e4b3cf..3b52b240c 100644 --- a/integration_tests/tests/integration/common.rs +++ b/integration_tests/tests/integration/common.rs @@ -247,8 +247,9 @@ pub async fn verify_aggregate_generic( before_timestamp .to_batch_interval_start(&task_parameters.time_precision) .unwrap(), - // Use two time precisions as the interval duration in order to avoid a race condition if - // this test happens to run very close to the end of a batch window. + // Use two time precisions as the interval duration in order to avoid a race + // condition if this test happens to run very close to the end of a + // batch window. Duration::from_seconds(2 * task_parameters.time_precision.as_seconds()), ) .unwrap(); diff --git a/integration_tests/tests/integration/in_cluster.rs b/integration_tests/tests/integration/in_cluster.rs index ce629a179..3b52de652 100644 --- a/integration_tests/tests/integration/in_cluster.rs +++ b/integration_tests/tests/integration/in_cluster.rs @@ -38,9 +38,10 @@ use uuid::Uuid; version = env!("CARGO_PKG_VERSION"), )] struct Options { - /// If set, the integration tests will be run against remote instances of `divviup-api`, a Janus - /// leader and a Janus helper. If not set, the integration tests will be run against instances - /// of `divviup-api`, a Janus leader and a Janus helper in an adjacent Kubernetes cluster. + /// If set, the integration tests will be run against remote instances of `divviup-api`, a + /// Janus leader and a Janus helper. If not set, the integration tests will be run against + /// instances of `divviup-api`, a Janus leader and a Janus helper in an adjacent Kubernetes + /// cluster. /// /// See doccomments on InClusterJanusPair::new_in_cloud and InClusterJanusPair::new_in_kind for /// discussion of how to configure this test setup. @@ -227,12 +228,12 @@ impl InClusterJanusPair { /// - `JANUS_E2E_KUBECTL_CONTEXT_NAME`: The name of a context in the kubeconfig file. /// - `JANUS_E2E_LEADER_NAMESPACE`: The Kubernetes namespace where the DAP leader is deployed. /// - `JANUS_E2E_LEADER_AGGREGATOR_API_AUTH_TOKEN`: Credential with which requests to the - /// leader's aggregator API are authenticated. + /// leader's aggregator API are authenticated. /// - `JANUS_E2E_HELPER_NAMESPACE`: The Kubernetes namespace where the DAP helper is deployed. /// - `JANUS_E2E_HELPER_AGGREGATOR_API_AUTH_TOKEN`: Credential with which requests to the - /// helper's aggregator API are authenticated. + /// helper's aggregator API are authenticated. /// - `JANUS_E2E_DIVVIUP_API_NAMESPACE`: The Kubernetes namespace where `divviup-api` is - /// deployed. + /// deployed. async fn new_in_kind(vdaf: VdafInstance, query_type: QueryType) -> Self { let ( kubeconfig_path, @@ -730,9 +731,8 @@ mod rate_limits { let expected_429_rate = rate_limit_excess / (1.0 + rate_limit_excess); assert!( ratio > expected_429_rate - 0.05 && ratio <= expected_429_rate + 0.05, - "ratio: {ratio} expected 429 rate: {expected_429_rate} \ - count of HTTP 429: {too_many_requests_count} \ - count of HTTP 400: {acceptable_status_count}", + "ratio: {ratio} expected 429 rate: {expected_429_rate} count of HTTP 429: \ + {too_many_requests_count} count of HTTP 400: {acceptable_status_count}", ); let last_retry_after = assert_matches!(last_retry_after, Some(l) => l); diff --git a/interop_binaries/src/commands/janus_interop_client.rs b/interop_binaries/src/commands/janus_interop_client.rs index a16bea42e..36a2dfcaa 100644 --- a/interop_binaries/src/commands/janus_interop_client.rs +++ b/interop_binaries/src/commands/janus_interop_client.rs @@ -243,9 +243,8 @@ impl Options { #[cfg(test)] mod tests { - use clap::CommandFactory; - use super::Options; + use clap::CommandFactory; #[test] fn verify_clap_app() { diff --git a/interop_binaries/src/commands/janus_interop_collector.rs b/interop_binaries/src/commands/janus_interop_collector.rs index 3224daf5a..18dee43d5 100644 --- a/interop_binaries/src/commands/janus_interop_collector.rs +++ b/interop_binaries/src/commands/janus_interop_collector.rs @@ -13,10 +13,13 @@ use fixed::types::extra::{U15, U31}; #[cfg(feature = "fpvec_bounded_l2")] use fixed::{FixedI16, FixedI32}; use janus_collector::Collector; -use janus_core::vdaf::new_prio3_sum_vec_field64_multiproof_hmacsha256_aes128; #[cfg(feature = "fpvec_bounded_l2")] use janus_core::vdaf::Prio3FixedPointBoundedL2VecSumBitSize; -use janus_core::{auth_tokens::AuthenticationToken, hpke::HpkeKeypair, vdaf::VdafInstance}; +use janus_core::{ + auth_tokens::AuthenticationToken, + hpke::HpkeKeypair, + vdaf::{new_prio3_sum_vec_field64_multiproof_hmacsha256_aes128, VdafInstance}, +}; use janus_messages::{ query_type::QueryType, BatchId, Duration, FixedSizeQuery, HpkeConfig, Interval, PartialBatchSelector, Query, TaskId, Time, @@ -850,9 +853,8 @@ impl Options { #[cfg(test)] mod tests { - use clap::CommandFactory; - use super::Options; + use clap::CommandFactory; #[test] fn verify_clap_app() { diff --git a/interop_binaries/src/lib.rs b/interop_binaries/src/lib.rs index 5361e3358..0e1256afa 100644 --- a/interop_binaries/src/lib.rs +++ b/interop_binaries/src/lib.rs @@ -381,8 +381,9 @@ impl HpkeConfigRegistry { // Unwrap safety: we always use a supported KEM. generate_hpke_config_and_private_key( id, - // These algorithms should be broadly compatible with other DAP implementations, since they - // are required by section 6 of draft-ietf-ppm-dap-02. + // These algorithms should be broadly compatible with other DAP + // implementations, since they are required by section 6 of + // draft-ietf-ppm-dap-02. HpkeKemId::X25519HkdfSha256, HpkeKdfId::HkdfSha256, HpkeAeadId::Aes128Gcm, diff --git a/interop_binaries/tests/end_to_end.rs b/interop_binaries/tests/end_to_end.rs index e1ec71e34..34afe00fe 100644 --- a/interop_binaries/tests/end_to_end.rs +++ b/interop_binaries/tests/end_to_end.rs @@ -2,6 +2,8 @@ use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; +#[cfg(feature = "fpvec_bounded_l2")] +use fixed_macro::fixed; use futures::future::join_all; use janus_core::{ test_util::install_test_trace_subscriber, @@ -26,9 +28,6 @@ use std::time::Duration as StdDuration; use testcontainers::{runners::AsyncRunner, RunnableImage}; use tokio::time::sleep; -#[cfg(feature = "fpvec_bounded_l2")] -use fixed_macro::fixed; - const JSON_MEDIA_TYPE: &str = "application/json"; const TIME_PRECISION: u64 = 3600; @@ -423,8 +422,8 @@ async fn run( .to_batch_interval_start(&Duration::from_seconds(TIME_PRECISION)) .unwrap() .as_seconds_since_epoch(); - // Span the aggregation over two time precisions, just in case our measurements spilled over a - // batch boundary. + // Span the aggregation over two time precisions, just in case our measurements spilled + // over a batch boundary. let batch_interval_duration = TIME_PRECISION * 2; json!({ "type": query_type_json, @@ -493,7 +492,8 @@ async fn run( .as_str() .expect("\"handle\" value is not a string"); - // Send /internal/test/collection_poll requests to the collector, polling until it is completed. + // Send /internal/test/collection_poll requests to the collector, polling until it is + // completed. let mut collection_poll_backoff = ExponentialBackoffBuilder::new() .with_initial_interval(StdDuration::from_millis(500)) .with_max_interval(StdDuration::from_millis(500)) diff --git a/messages/src/lib.rs b/messages/src/lib.rs index 814a51b40..437728c4e 100644 --- a/messages/src/lib.rs +++ b/messages/src/lib.rs @@ -8,6 +8,7 @@ use anyhow::anyhow; use base64::{display::Base64Display, engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use derivative::Derivative; use num_enum::{FromPrimitive, IntoPrimitive, TryFromPrimitive}; +pub use prio::codec; use prio::{ codec::{ decode_u16_items, decode_u32_items, encode_u16_items, encode_u32_items, CodecError, Decode, @@ -29,8 +30,6 @@ use std::{ time::{SystemTime, SystemTimeError}, }; -pub use prio::codec; - pub mod problem_type; pub mod query_type; pub mod taskprov; @@ -527,8 +526,8 @@ impl Role { } /// Returns a VDAF aggregator ID if this [`Role`] is one of the aggregators, or `None` if the - /// role is not an aggregator. This is also used in [draft-wang-ppm-dap-taskprov-04][1] and earlier - /// to index into the `aggregator_endpoints` array. + /// role is not an aggregator. This is also used in [draft-wang-ppm-dap-taskprov-04][1] and + /// earlier to index into the `aggregator_endpoints` array. /// /// [1]: https://www.ietf.org/archive/id/draft-wang-ppm-dap-taskprov-04.html#section-3-4 pub fn index(&self) -> Option { @@ -1155,12 +1154,14 @@ impl HpkeConfig { &self.id } - /// Retrieve the key encapsulation mechanism algorithm identifier associated with this HPKE configuration. + /// Retrieve the key encapsulation mechanism algorithm identifier associated with this HPKE + /// configuration. pub fn kem_id(&self) -> &HpkeKemId { &self.kem_id } - /// Retrieve the key derivation function algorithm identifier associated with this HPKE configuration. + /// Retrieve the key derivation function algorithm identifier associated with this HPKE + /// configuration. pub fn kdf_id(&self) -> &HpkeKdfId { &self.kdf_id } diff --git a/messages/src/query_type.rs b/messages/src/query_type.rs index a36995ef4..9c9f4c673 100644 --- a/messages/src/query_type.rs +++ b/messages/src/query_type.rs @@ -1,6 +1,5 @@ -use crate::{Collection, FixedSizeQuery, Query}; - use super::{BatchId, Interval}; +use crate::{Collection, FixedSizeQuery, Query}; use anyhow::anyhow; use num_enum::TryFromPrimitive; use prio::codec::{CodecError, Decode, Encode}; diff --git a/messages/src/tests/collection.rs b/messages/src/tests/collection.rs index 5babc4ba7..98172f15f 100644 --- a/messages/src/tests/collection.rs +++ b/messages/src/tests/collection.rs @@ -85,7 +85,7 @@ fn roundtrip_collection_req() { concat!( // query_body "00", // query_type - "0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A", // batch_id + "0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A0A", /* batch_id */ ), ), concat!( diff --git a/tools/src/bin/collect.rs b/tools/src/bin/collect.rs index f8f9899c3..5f9f9ffad 100644 --- a/tools/src/bin/collect.rs +++ b/tools/src/bin/collect.rs @@ -327,8 +327,8 @@ struct Options { display_order = 0 )] vdaf: VdafType, - /// Number of vector elements, when used with --vdaf=sumvec or number of histogram buckets, when - /// used with --vdaf=histogram + /// Number of vector elements, when used with --vdaf=sumvec or number of histogram buckets, + /// when used with --vdaf=histogram #[clap(long, help_heading = "VDAF Algorithm and Parameters")] length: Option, /// Bit length of measurements, for use with --vdaf=sum and --vdaf=sumvec