Skip to content

Commit

Permalink
Parameterize DapRequest on the resource it holds.
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Oct 15, 2024
1 parent 57920da commit b305b85
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 259 deletions.
52 changes: 30 additions & 22 deletions crates/daphne-server/src/roles/leader.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

#![allow(unused_variables)]

use std::{borrow::Cow, time::Instant};

use axum::{async_trait, http::Method};
Expand All @@ -12,7 +10,7 @@ use daphne::{
fatal_error,
messages::{BatchId, BatchSelector, Collection, CollectionJobId, Report, TaskId},
roles::{leader::WorkItem, DapAggregator, DapLeader},
DapAggregationParam, DapCollectionJob, DapError, DapRequest, DapResponse, DapVersion,
DapAggregationParam, DapCollectionJob, DapError, DapRequestMeta, DapResponse, DapVersion,
};
use daphne_service_utils::http_headers;
use http::StatusCode;
Expand Down Expand Up @@ -100,40 +98,51 @@ impl DapLeader for crate::App {
self.test_leader_state.lock().await.enqueue_work(items)
}

async fn send_http_post<M>(&self, req: DapRequest<M>, url: Url) -> Result<DapResponse, DapError>
async fn send_http_post<P>(
&self,
meta: DapRequestMeta,
url: Url,
payload: P,
) -> Result<DapResponse, DapError>
where
M: Send + ParameterizedEncode<DapVersion>,
P: Send + ParameterizedEncode<DapVersion>,
{
self.send_http(req, Method::POST, url).await
self.send_http(meta, Method::POST, url, payload).await
}

async fn send_http_put<M>(&self, req: DapRequest<M>, url: Url) -> Result<DapResponse, DapError>
async fn send_http_put<P>(
&self,
meta: DapRequestMeta,
url: Url,
payload: P,
) -> Result<DapResponse, DapError>
where
M: Send + ParameterizedEncode<DapVersion>,
P: Send + ParameterizedEncode<DapVersion>,
{
self.send_http(req, Method::PUT, url).await
self.send_http(meta, Method::PUT, url, payload).await
}
}

impl crate::App {
async fn send_http<M>(
async fn send_http<P>(
&self,
req: DapRequest<M>,
meta: DapRequestMeta,
method: Method,
url: Url,
payload: P,
) -> Result<DapResponse, DapError>
where
M: Send + ParameterizedEncode<DapVersion>,
P: Send + ParameterizedEncode<DapVersion>,
{
use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
let content_type = req
let content_type = meta
.media_type
.and_then(|mt| mt.as_str_for_version(req.version))
.and_then(|mt| mt.as_str_for_version(meta.version))
.ok_or_else(|| {
fatal_error!(
err = "failed to construct content-type",
?req.media_type,
?req.version,
?meta.media_type,
?meta.version,
)
})?;

Expand All @@ -144,7 +153,7 @@ impl crate::App {
.map_err(|e| fatal_error!(err = ?e, "failed to construct content-type header"))?,
);

let bearer_token = if req.taskprov.is_some() {
let bearer_token = if meta.taskprov.is_some() {
if let Some(bearer_token) = self
.service_config
.taskprov
Expand All @@ -157,12 +166,12 @@ impl crate::App {
detail: format!(
"taskprov authentication not setup for authentication with peer at {url}",
),
task_id: req.task_id,
task_id: meta.task_id,
}));
}
} else if let Some(bearer_token) = self
.bearer_tokens()
.get(daphne::DapSender::Leader, req.task_id)
.get(daphne::DapSender::Leader, meta.task_id)
.await
.map_err(|e| fatal_error!(err = ?e, "failed to get leader bearer token"))?
{
Expand All @@ -172,7 +181,7 @@ impl crate::App {
detail: format!(
"no suitable authentication method found for authenticating with peer at {url}",
),
task_id: req.task_id,
task_id: meta.task_id,
}));
};

Expand All @@ -182,7 +191,7 @@ impl crate::App {
.map_err(|e| fatal_error!(err = ?e, "failed to construct authentication header"))?,
);

if let Some(taskprov_advertisement) = req.taskprov.as_deref() {
if let Some(taskprov_advertisement) = meta.taskprov.as_deref() {
headers.insert(
HeaderName::from_static(http_headers::DAP_TASKPROV),
HeaderValue::from_str(taskprov_advertisement).map_err(
Expand All @@ -191,7 +200,6 @@ impl crate::App {
);
}

let DapRequest { meta, payload } = req;
let req_builder = self
.http
.request(method, url.clone())
Expand Down
Loading

0 comments on commit b305b85

Please sign in to comment.