Skip to content

Commit

Permalink
Improve readability of leader code
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Aug 3, 2023
1 parent e0fa341 commit cc32a0d
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions daphne/src/roles/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,33 @@ use crate::{
DapResource, DapResponse, DapTaskConfig, DapVersion, MetaAggregationJobId,
};

struct LeaderPostOptions<'p> {
struct LeaderHttpRequestOptions<'p> {
path: &'p str,
req_media_type: DapMediaType,
resp_media_type: DapMediaType,
resource: DapResource,
req_data: Vec<u8>,
is_put: bool,
method: LeaderHttpRequestMethod,
}

async fn leader_post<S>(
enum LeaderHttpRequestMethod {
Post,
Put,
}

async fn leader_send_http_request<S>(
role: &impl DapLeader<S>,
task_id: &TaskId,
task_config: &DapTaskConfig,
opts: LeaderPostOptions<'_>,
opts: LeaderHttpRequestOptions<'_>,
) -> Result<DapResponse, DapError> {
let LeaderPostOptions {
let LeaderHttpRequestOptions {
path,
req_media_type,
resp_media_type,
resource,
req_data,
is_put,
method: is_put,
} = opts;

let url = task_config
Expand All @@ -62,10 +67,9 @@ async fn leader_post<S>(
taskprov: None,
};

let resp = if is_put {
role.send_http_put(req).await?
} else {
role.send_http_post(req).await?
let resp = match is_put {
LeaderHttpRequestMethod::Put => role.send_http_put(req).await?,
LeaderHttpRequestMethod::Post => role.send_http_post(req).await?,
};

check_response_content_type(&resp, resp_media_type)?;
Expand Down Expand Up @@ -333,7 +337,11 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
return Err(fatal_error!(err = "unexpected state transition (uncommitted)").into())
}
};
let is_put = task_config.version != DapVersion::Draft02;
let method = if task_config.version != DapVersion::Draft02 {
LeaderHttpRequestMethod::Put
} else {
LeaderHttpRequestMethod::Post
};
let url_path = if task_config.version == DapVersion::Draft02 {
"aggregate".to_string()
} else {
Expand All @@ -345,17 +353,17 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
};

// Send AggregationJobInitReq and receive AggregationJobResp.
let resp = leader_post(
let resp = leader_send_http_request(
self,
task_id,
task_config,
LeaderPostOptions {
LeaderHttpRequestOptions {
path: &url_path,
req_media_type: DapMediaType::AggregationJobInitReq,
resp_media_type: DapMediaType::AggregationJobResp,
resource: agg_job_id.for_request_path(),
req_data: agg_job_init_req.get_encoded_with_param(&task_config.version),
is_put,
method,
},
)
.await?;
Expand All @@ -382,17 +390,17 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
};

// Send AggregationJobContinueReq and receive AggregationJobResp.
let resp = leader_post(
let resp = leader_send_http_request(
self,
task_id,
task_config,
LeaderPostOptions {
LeaderHttpRequestOptions {
path: &url_path,
req_media_type: DapMediaType::AggregationJobContinueReq,
resp_media_type: DapMediaType::agg_job_cont_resp_for_version(task_config.version),
resource: agg_job_id.for_request_path(),
req_data: agg_job_cont_req.get_encoded_with_param(&task_config.version),
is_put: false,
method: LeaderHttpRequestMethod::Post,
},
)
.await?;
Expand Down Expand Up @@ -469,17 +477,17 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
};

// Send AggregateShareReq and receive AggregateShareResp.
let resp = leader_post(
let resp = leader_send_http_request(
self,
task_id,
task_config,
LeaderPostOptions {
LeaderHttpRequestOptions {
path: &url_path,
req_media_type: DapMediaType::AggregateShareReq,
resp_media_type: DapMediaType::AggregateShare,
resource: DapResource::Undefined,
req_data: agg_share_req.get_encoded_with_param(&task_config.version),
is_put: false,
method: LeaderHttpRequestMethod::Post,
},
)
.await?;
Expand Down

0 comments on commit cc32a0d

Please sign in to comment.