Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use nightly rustfmt features, one import group #3077

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,7 @@ jobs:
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ matrix.rust-toolchain }}
components: clippy, rustfmt
- name: Format
run: cargo fmt --message-format human -- --check
components: clippy
- name: Clippy
run: cargo clippy --profile ci --workspace --all-targets
- name: Clippy (all features)
Expand All @@ -138,6 +136,18 @@ jobs:
with:
command: check bans licenses sources -A unmatched-organization

rustfmt:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust nightly toolchain
uses: dtolnay/rust-toolchain@master
with:
toolchain: nightly
components: rustfmt
- name: Format
run: cargo fmt --message-format human -- --check

janus_docker:
runs-on: ubuntu-latest
env:
Expand Down
5 changes: 5 additions & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
format_strings = true
wrap_comments = true
comment_width = 100
imports_granularity = "Crate"
group_imports = "One"
10 changes: 10 additions & 0 deletions .vscode/settings.json.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"rust-analyzer.cargo.features": [
"prometheus",
"otlp",
"testcontainer"
],
"rust-analyzer.rustfmt.extraArgs": [
"+nightly"
]
}
101 changes: 54 additions & 47 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use crate::{
AggregationJobWriter, AggregationJobWriterMetrics, InitialWrite,
ReportAggregationUpdate as _, WritableReportAggregation,
},
error::{handle_ping_pong_error, ReportRejection, ReportRejectionReason},
error::{BatchMismatch, OptOutReason},
error::{
handle_ping_pong_error, BatchMismatch, OptOutReason, ReportRejection,
ReportRejectionReason,
},
query_type::{CollectableQueryType, UploadableQueryType},
report_writer::{ReportWriteBatcher, WritableReport},
},
Expand Down Expand Up @@ -161,8 +163,8 @@ struct AggregatorMetrics {
upload_decode_failure_counter: Counter<u64>,
/// Counter tracking the number of successfully-aggregated reports.
report_aggregation_success_counter: Counter<u64>,
/// Counters tracking the number of failures to step client reports through the aggregation
/// process.
/// Counters tracking the number of failures to step client reports through
/// the aggregation process.
aggregate_step_failure_counter: Counter<u64>,
}

Expand All @@ -182,8 +184,8 @@ pub struct Config {
/// transaction.
pub max_upload_batch_size: usize,

/// Defines the maximum delay before writing a batch of uploaded reports, even if it has not yet
/// reached `max_batch_upload_size`. This is the maximum delay added to the
/// Defines the maximum delay before writing a batch of uploaded reports, even if it has not
/// yet reached `max_batch_upload_size`. This is the maximum delay added to the
/// `tasks/{task-id}/reports` endpoint due to write-batching.
pub max_upload_batch_write_delay: StdDuration,

Expand All @@ -197,16 +199,16 @@ pub struct Config {
/// of getting task metrics.
pub task_counter_shard_count: u64,

/// Defines how often to refresh the global HPKE configs cache. This affects how often an aggregator
/// becomes aware of key state changes.
/// Defines how often to refresh the global HPKE configs cache. This affects how often an
/// aggregator becomes aware of key state changes.
pub global_hpke_configs_refresh_interval: StdDuration,

/// Defines how long tasks should be cached for. This affects how often an aggregator becomes aware
/// of task parameter changes.
/// Defines how long tasks should be cached for. This affects how often an aggregator becomes
/// aware of task parameter changes.
pub task_cache_ttl: StdDuration,

/// Defines how many tasks can be cached at once. This affects how much memory the aggregator may
/// consume for caching tasks.
/// Defines how many tasks can be cached at once. This affects how much memory the aggregator
/// may consume for caching tasks.
pub task_cache_capacity: u64,

/// The key used to sign HPKE configurations.
Expand Down Expand Up @@ -248,9 +250,9 @@ impl<C: Clock> Aggregator<C> {
cfg.max_upload_batch_size,
cfg.max_upload_batch_write_delay,
),
// If we're in taskprov mode, we can never cache None entries for tasks, since aggregators
// could insert tasks at any time and expect them to be available across all aggregator
// replicas.
// If we're in taskprov mode, we can never cache None entries for tasks, since
// aggregators could insert tasks at any time and expect them to be available across
// all aggregator replicas.
!cfg.taskprov_config.enabled,
cfg.task_cache_capacity,
StdDuration::from_secs(1),
Expand Down Expand Up @@ -339,10 +341,10 @@ impl<C: Clock> Aggregator<C> {
match task_aggregator.handle_hpke_config() {
Some(hpke_config_list) => hpke_config_list,
// Assuming something hasn't gone horribly wrong with the database, this
// should only happen in the case where the system has been moved from taskprov
// mode to non-taskprov mode. Thus there's still taskprov tasks in the database.
// This isn't a supported use case, so the operator needs to delete these tasks
// or move the system back into taskprov mode.
// should only happen in the case where the system has been moved from
// taskprov mode to non-taskprov mode. Thus there's still taskprov tasks in
// the database. This isn't a supported use case, so the operator needs to
// delete these tasks or move the system back into taskprov mode.
None => {
return Err(Error::Internal("task has no HPKE configs".to_string()))
}
Expand Down Expand Up @@ -580,10 +582,10 @@ impl<C: Clock> Aggregator<C> {
.await
}

/// Handle a GET request for a collection job. `collection_job_id` is the unique identifier for the
/// collection job parsed out of the request URI. Returns an encoded [`Collection`] if the collect
/// job has been run to completion, `None` if the collection job has not yet run, or an error
/// otherwise.
/// Handle a GET request for a collection job. `collection_job_id` is the unique identifier for
/// the collection job parsed out of the request URI. Returns an encoded [`Collection`] if the
/// collect job has been run to completion, `None` if the collection job has not yet run, or an
/// error otherwise.
async fn handle_get_collection_job(
&self,
task_id: &TaskId,
Expand Down Expand Up @@ -657,8 +659,9 @@ impl<C: Clock> Aggregator<C> {
return Err(Error::UnrecognizedTask(*task_id));
}

// Authorize the request and retrieve the collector's HPKE config. If this is a taskprov task, we
// have to use the peer aggregator's collector config rather than the main task.
// Authorize the request and retrieve the collector's HPKE config. If this is a taskprov
// task, we have to use the peer aggregator's collector config rather than the main
// task.
let collector_hpke_config =
if self.cfg.taskprov_config.enabled && taskprov_task_config.is_some() {
let (peer_aggregator, _, _) = self
Expand Down Expand Up @@ -768,9 +771,9 @@ impl<C: Clock> Aggregator<C> {
.await
.or_else(|error| -> Result<(), Error> {
match error {
// If the task is already in the datastore, then some other request or aggregator
// replica beat us to inserting it. They _should_ have inserted all the same parameters
// as we would have, so we can proceed as normal.
// If the task is already in the datastore, then some other request or
// aggregator replica beat us to inserting it. They _should_ have inserted all
// the same parameters as we would have, so we can proceed as normal.
DatastoreError::MutationTargetAlreadyExists => {
warn!(
?task_id,
Expand Down Expand Up @@ -972,7 +975,8 @@ impl<C: Clock> TaskAggregator<C> {
fn handle_hpke_config(&self) -> Option<HpkeConfigList> {
// TODO(#239): consider deciding a better way to determine "primary" (e.g. most-recent) HPKE
// config/key -- right now it's the one with the maximal config ID, but that will run into
// trouble if we ever need to wrap-around, which we may since config IDs are effectively a u8.
// trouble if we ever need to wrap-around, which we may since config IDs are effectively a
// u8.
Some(HpkeConfigList::new(Vec::from([self
.task
.hpke_keys()
Expand Down Expand Up @@ -1119,10 +1123,9 @@ impl<C: Clock> TaskAggregator<C> {

#[cfg(feature = "fpvec_bounded_l2")]
mod vdaf_ops_strategies {
use std::sync::Arc;

use janus_core::vdaf::vdaf_dp_strategies;
use prio::dp::distributions::ZCdpDiscreteGaussian;
use std::sync::Arc;

#[derive(Debug)]
pub enum Prio3FixedPointBoundedL2VecSum {
Expand Down Expand Up @@ -1781,7 +1784,8 @@ impl VdafOps {
// Decrypt shares & prepare initialization states. (§4.4.4.1)
let mut report_share_data = Vec::new();
for (ord, prepare_init) in req.prepare_inits().iter().enumerate() {
// If decryption fails, then the aggregator MUST fail with error `hpke-decrypt-error`. (§4.4.2.2)
// If decryption fails, then the aggregator MUST fail with error
// `hpke-decrypt-error`. (§4.4.2.2)
let global_hpke_keypair = global_hpke_keypairs.keypair(
prepare_init
.report_share()
Expand Down Expand Up @@ -2003,9 +2007,10 @@ impl VdafOps {
Ok(shares)
});

// Next, the aggregator runs the preparation-state initialization algorithm for the VDAF
// associated with the task and computes the first state transition. [...] If either
// step fails, then the aggregator MUST fail with error `vdaf-prep-error`. (§4.4.2.2)
// Next, the aggregator runs the preparation-state initialization algorithm for
// the VDAF associated with the task and computes the first state transition.
// [...] If either step fails, then the aggregator MUST fail with error
// `vdaf-prep-error`. (§4.4.2.2)
let init_rslt = shares.and_then(|(public_share, input_share)| {
trace_span!("VDAF preparation (helper initialization)").in_scope(|| {
vdaf.helper_initialized(
Expand Down Expand Up @@ -2033,8 +2038,8 @@ impl VdafOps {
let (report_aggregation_state, prepare_step_result, output_share) =
match init_rslt {
Ok((PingPongState::Continued(prepare_state), outgoing_message)) => {
// Helper is not finished. Await the next message from the Leader to advance to
// the next step.
// Helper is not finished. Await the next message from the Leader to
// advance to the next step.
(
ReportAggregationState::WaitingHelper { prepare_state },
PrepareStepResult::Continue {
Expand Down Expand Up @@ -2199,7 +2204,8 @@ impl VdafOps {
}

// Write report shares, and ensure this isn't a repeated report aggregation.
// TODO(#225): on repeated aggregation, verify input share matches previously-received input share
// TODO(#225): on repeated aggregation, verify input share matches
// previously-received input share
try_join_all(report_share_data.iter_mut().map(|rsd| {
let task = Arc::clone(&task);
let aggregation_job = Arc::clone(&aggregation_job);
Expand Down Expand Up @@ -2366,8 +2372,8 @@ impl VdafOps {
.collect(),
));
} else if aggregation_job.step().increment() != req.step() {
// If this is not a replay, the leader should be advancing our state to the next
// step and no further.
// If this is not a replay, the leader should be advancing our state to the
// next step and no further.
return Err(datastore::Error::User(
Error::StepMismatch {
task_id: *task.id(),
Expand Down Expand Up @@ -2513,7 +2519,8 @@ impl VdafOps {
Arc::clone(&aggregation_param),
);
Box::pin(async move {
// Check if this collection job already exists, ensuring that all parameters match.
// Check if this collection job already exists, ensuring that all parameters
// match.
if let Some(collection_job) = tx
.get_collection_job::<SEED_SIZE, Q, A>(&vdaf, task.id(), &collection_job_id)
.await?
Expand Down Expand Up @@ -2680,9 +2687,9 @@ impl VdafOps {
} => {
// §4.4.4.3: HPKE encrypt aggregate share to the collector. We store the leader
// aggregate share *unencrypted* in the datastore so that we can encrypt cached
// results to the collector HPKE config valid when the current collection job request
// was made, and not whatever was valid at the time the aggregate share was first
// computed.
// results to the collector HPKE config valid when the current collection job
// request was made, and not whatever was valid at the time the
// aggregate share was first computed.
// However we store the helper's *encrypted* share.

// TODO(#240): consider fetching freshly encrypted helper aggregate share if it has
Expand All @@ -2694,9 +2701,9 @@ impl VdafOps {
"Serving cached collection job response"
);
let encrypted_leader_aggregate_share = hpke::seal(
// Unwrap safety: collector_hpke_config is only None for taskprov tasks. Taskprov
// is not currently supported for Janus operating as the Leader, so this unwrap
// is not reachable.
// Unwrap safety: collector_hpke_config is only None for taskprov tasks.
// Taskprov is not currently supported for Janus operating
// as the Leader, so this unwrap is not reachable.
task.collector_hpke_config().unwrap(),
&HpkeApplicationInfo::new(
&Label::AggregateShare,
Expand Down
13 changes: 7 additions & 6 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,23 @@ impl VdafOps {
let mut report_aggregations_iter = report_aggregations.into_iter();
let mut report_aggregations_to_write = Vec::new();
for prep_step in req.prepare_steps() {
// Match preparation step received from leader to stored report aggregation, and extract
// the stored preparation step.
// Match preparation step received from leader to stored report aggregation, and
// extract the stored preparation step.
let report_aggregation = loop {
let report_agg = report_aggregations_iter.next().ok_or_else(|| {
datastore::Error::User(
Error::InvalidMessage(
Some(task_id),
"leader sent unexpected, duplicate, or out-of-order prepare \
steps",
steps",
)
.into(),
)
})?;
if report_agg.report_id() != prep_step.report_id() {
// This report was omitted by the leader because of a prior failure. Note that
// the report was dropped (if it's not already in an error state) and continue.
// This report was omitted by the leader because of a prior failure.
// Note that the report was dropped (if it's
// not already in an error state) and continue.
if matches!(
report_agg.state(),
ReportAggregationState::WaitingHelper { .. }
Expand All @@ -113,7 +114,7 @@ impl VdafOps {
return Err(datastore::Error::User(
Error::Internal(
"helper encountered unexpected \
ReportAggregationState::WaitingLeader"
ReportAggregationState::WaitingLeader"
.to_string(),
)
.into(),
Expand Down
Loading
Loading