Skip to content

Commit

Permalink
Remove admin task config API
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Sep 21, 2023
1 parent 42fd947 commit c93e811
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 95 deletions.
12 changes: 0 additions & 12 deletions daphne_worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ pub(crate) struct DaphneWorkerConfig {
/// Default DAP version to use if not specified by the API URL
pub(crate) default_version: DapVersion,

/// Admin bearer token. If configured, it is used to authorize requests from the administrator.
pub(crate) admin_token: Option<BearerToken>,

/// Helper: Time to wait before deleting an instance of HelperStateStore. This field is not
/// configured by the Leader.
pub(crate) helper_state_store_garbage_collect_after_secs: Option<Duration>,
Expand Down Expand Up @@ -262,14 +259,6 @@ impl DaphneWorkerConfig {
None
};

let admin_token = match env.secret("DAP_ADMIN_BEARER_TOKEN") {
Ok(raw) => Some(BearerToken::from(raw.to_string())),
Err(err) => {
trace!("DAP_ADMIN_BEARER_TOKEN not configured: {err:?}");
None
}
};

let helper_state_store_garbage_collect_after_secs = if !is_leader {
Some(Duration::from_secs(
env.var("DAP_HELPER_STATE_STORE_GARBAGE_COLLECT_AFTER_SECS")?
Expand Down Expand Up @@ -333,7 +322,6 @@ impl DaphneWorkerConfig {
is_leader,
taskprov,
default_version,
admin_token,
helper_state_store_garbage_collect_after_secs,
processed_alarm_safety_interval,
metrics_push_config,
Expand Down
47 changes: 12 additions & 35 deletions daphne_worker/src/router/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,20 @@
use daphne::{auth::BearerToken, roles::DapAggregator};
use tracing::{info_span, Instrument};
use worker::Response;
use daphne::roles::DapAggregator;
use tracing::Instrument;

use crate::info_span_from_dap_request;

use super::{dap_response_to_worker, test_routes, DapRouter};
use super::{dap_response_to_worker, DapRouter};

pub(super) fn add_aggregator_routes(router: DapRouter<'_>) -> DapRouter<'_> {
router
.get_async("/:version/hpke_config", |req, ctx| async move {
let daph = ctx.data.handler(&ctx.env);
let req = daph.worker_request_to_dap(req, &ctx).await?;
router.get_async("/:version/hpke_config", |req, ctx| async move {
let daph = ctx.data.handler(&ctx.env);
let req = daph.worker_request_to_dap(req, &ctx).await?;

let span = info_span_from_dap_request!("hpke_config", req);
let span = info_span_from_dap_request!("hpke_config", req);

match daph.handle_hpke_config_req(&req).instrument(span).await {
Ok(req) => dap_response_to_worker(req),
Err(e) => daph.state.dap_abort_to_worker_response(e),
}
})
.post_async("/task", |mut req, ctx| async move {
let daph = ctx.data.handler(&ctx.env);
let admin_token = req
.headers()
.get("X-Daphne-Worker-Admin-Bearer-Token")?
.map(BearerToken::from);

if daph.config().admin_token.is_none() {
return Response::error("admin not configured", 400);
}

if admin_token.is_none() || admin_token != daph.config().admin_token {
return Response::error("missing or invalid bearer token for admin", 401);
}

let cmd: test_routes::InternalTestAddTask = req.json().await?;
daph.internal_add_task(daph.config().default_version, cmd)
.instrument(info_span!("task"))
.await?;
Response::empty()
})
match daph.handle_hpke_config_req(&req).instrument(span).await {
Ok(req) => dap_response_to_worker(req),
Err(e) => daph.state.dap_abort_to_worker_response(e),
}
})
}
47 changes: 0 additions & 47 deletions daphne_worker_test/tests/e2e/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1503,50 +1503,3 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
}

async_test_version! { leader_collect_taskprov_ok, Draft02 }

async fn helper_admin_add_task(version: DapVersion) {
let t = TestRunner::default_with_version(version).await;

let add_task_cmd = serde_json::json!({
"collector_hpke_config": "kwAgAAEAAQAgAPjfKNRNrnodTEuoCKA5qAOTaWOmVlmNVyAXOL6__20",
"leader": format!("http://cool.leader/{}/", version.as_ref()),
"helper": format!("https:/awesome.helper.web:8788/{}/", version.as_ref()),
"leader_authentication_token": "leader bearer token",
"min_batch_size": 10,
"query_type": 1,
"role": "helper",
"task_expiration": 1670880698,
"task_id": "GNsYenwC_BMh9QddDHjVfvuhKKyvJZlt24FP3hubplw",
"time_precision": 3600,
"vdaf": {
"bits":"10",
"type":"Prio3Sum"
},
"vdaf_verify_key": "y4e6alnJMQ0MZTvdJRJx5Q"
});

let mut url = t.helper_url.clone();
url.set_path("task");
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::HeaderName::from_lowercase(b"x-daphne-worker-admin-bearer-token").unwrap(),
"administrator bearer token".parse().unwrap(),
);
let resp = t
.http_client()
.post(url.clone())
.json(&add_task_cmd)
.headers(headers)
.send()
.await
.expect("request failed");
if resp.status() != 200 {
panic!(
"request to {url} failed: {}: {}",
resp.status(),
resp.text().await.unwrap()
);
}
}

async_test_versions! { helper_admin_add_task }
1 change: 0 additions & 1 deletion daphne_worker_test/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ preview_id = "24c4dc92d5cf4680e508fe18eb8f0281"
# production. In particular, they will not be passed as environment variables
# as they are here. See
# https://developers.cloudflare.com/workers/wrangler/commands/#secret.
DAP_ADMIN_BEARER_TOKEN = "administrator bearer token" # SECRET
DAP_AGGREGATOR_ROLE = "helper"
DAP_BASE_URL = "http://127.0.0.1:8788/"
DAP_ISSUE73_DISABLE_AGG_JOB_QUEUE_GARBAGE_COLLECTION = "true"
Expand Down

0 comments on commit c93e811

Please sign in to comment.