Skip to content

Commit

Permalink
Move authentication logic out of the daphne crate
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Oct 4, 2024
1 parent e6fad5e commit d998220
Show file tree
Hide file tree
Showing 33 changed files with 876 additions and 1,091 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ constcat = "0.5.0"
criterion = { version = "0.5.1", features = ["async_tokio"] }
deepsize = { version = "0.2.0" }
dhat = "0.3.3"
either = "1.13.0"
futures = "0.3.30"
getrandom = "0.2.15"
headers = "0.4"
Expand Down Expand Up @@ -86,9 +87,9 @@ tracing = "0.1.40"
tracing-core = "0.1.32"
tracing-subscriber = "0.3.18"
url = { version = "2.5.2", features = ["serde"] }
wasm-streams = "0.4"
webpki = "0.22.4"
worker = { version = "0.3.3", features = ["http"] }
wasm-streams = "0.4"
x509-parser = "0.15.1"

[workspace.dependencies.sentry]
Expand Down
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ s: storage-proxy
e2e: /tmp/private-key /tmp/certificate
export HPKE_SIGNING_KEY="$$(cat /tmp/private-key)"; \
export E2E_TEST_HPKE_SIGNING_CERTIFICATE="$$(cat /tmp/certificate)"; \
docker compose -f ./crates/daphne-server/docker-compose-e2e.yaml up --build --abort-on-container-exit --exit-code-from test
docker compose -f ./crates/daphne-server/docker-compose-e2e.yaml up \
--no-attach leader_storage \
--no-attach helper_storage \
--build \
--abort-on-container-exit \
--exit-code-from test

build_interop:
docker build . -f ./interop/Dockerfile.interop_helper --tag daphne-interop
Expand Down
3 changes: 1 addition & 2 deletions crates/dapf/src/acceptance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::{
use anyhow::{anyhow, bail, Context, Result};
use async_trait::async_trait;
use daphne::{
auth::BearerToken,
constants::DapMediaType,
error::aborts::ProblemDetails,
hpke::{HpkeConfig, HpkeKemId, HpkeReceiverConfig},
Expand All @@ -40,7 +39,7 @@ use daphne::{
DapQueryConfig, DapTaskConfig, DapTaskParameters, DapVersion, EarlyReportStateConsumed,
EarlyReportStateInitialized, ReplayProtection,
};
use daphne_service_utils::http_headers;
use daphne_service_utils::{bearer_token::BearerToken, http_headers};
use futures::{future::OptionFuture, StreamExt, TryStreamExt};
use prio::codec::{Decode, ParameterizedEncode};
use prometheus::{Encoder, HistogramVec, IntCounterVec, IntGaugeVec, TextEncoder};
Expand Down
3 changes: 2 additions & 1 deletion crates/daphne-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ description = "Workers backend for Daphne"
axum = "0.6.0" # held back to use http 0.2
daphne = { path = "../daphne" }
daphne-service-utils = { path = "../daphne-service-utils", features = ["durable_requests"] }
either.workspace = true
futures.workspace = true
hex.workspace = true
http = "0.2" # held back to use http 0.2
Expand Down Expand Up @@ -43,7 +44,7 @@ assert_matches.workspace = true
clap.workspace = true
config.workspace = true
daphne = { path = "../daphne", features = ["test-utils"] }
daphne-service-utils = { path = "../daphne-service-utils", features = ["prometheus"] }
daphne-service-utils = { path = "../daphne-service-utils", features = ["prometheus", "test-utils"] }
dhat.workspace = true
hpke-rs.workspace = true
paste.workspace = true
Expand Down
7 changes: 1 addition & 6 deletions crates/daphne-server/examples/configuration-helper.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@ allow_taskprov = true
default_num_agg_span_shards = 4

[service.taskprov]
peer_auth.leader.expected_token = "I-am-the-leader" # SECRET
vdaf_verify_key_init = "b029a72fa327931a5cb643dcadcaafa098fcbfac07d990cb9e7c9a8675fafb18" # SECRET
leader_auth = """{
"bearer_token": "I-am-the-leader"
}""" # SECRET
collector_auth = """{
"bearer_token": "I-am-the-collector"
}""" # SECRET
hpke_collector_config = """{
"id": 23,
"kem_id": "p256_hkdf_sha256",
Expand Down
8 changes: 2 additions & 6 deletions crates/daphne-server/examples/configuration-leader.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@ allow_taskprov = true
default_num_agg_span_shards = 4

[service.taskprov]
peer_auth.collector.expected_token = "I-am-the-collector" # SECRET
self_bearer_token = "I-am-the-leader" # SECRET
vdaf_verify_key_init = "b029a72fa327931a5cb643dcadcaafa098fcbfac07d990cb9e7c9a8675fafb18" # SECRET
leader_auth = """{
"bearer_token": "I-am-the-leader"
}""" # SECRET
collector_auth = """{
"bearer_token": "I-am-the-collector"
}""" # SECRET
hpke_collector_config = """{
"id": 23,
"kem_id": "p256_hkdf_sha256",
Expand Down
94 changes: 88 additions & 6 deletions crates/daphne-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ use std::sync::Arc;

use daphne::{
audit_log::{AuditLog, NoopAuditLog},
auth::BearerToken,
roles::leader::in_memory_leader::InMemoryLeaderState,
DapError,
fatal_error,
messages::{Base64Encode, TaskId},
roles::{leader::in_memory_leader::InMemoryLeaderState, DapAggregator},
DapError, DapSender,
};
use daphne_service_utils::{config::DaphneServiceConfig, metrics::DaphneServiceMetrics};
use daphne_service_utils::{
bearer_token::BearerToken,
config::{DaphneServiceConfig, PeerBearerToken},
metrics::DaphneServiceMetrics,
};
use either::Either::{self, Left, Right};
use futures::lock::Mutex;
use roles::BearerTokens;
use serde::{Deserialize, Serialize};
use storage_proxy_connection::{kv, Do, Kv};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -38,7 +45,11 @@ mod storage_proxy_connection;
/// use url::Url;
/// use daphne::{DapGlobalConfig, hpke::HpkeKemId, DapVersion};
/// use daphne_server::{App, router, StorageProxyConfig};
/// use daphne_service_utils::{config::DaphneServiceConfig, DapRole, metrics::DaphnePromServiceMetrics};
/// use daphne_service_utils::{
/// config::DaphneServiceConfig,
/// DapRole,
/// metrics::DaphnePromServiceMetrics
/// };
///
/// let storage_proxy_settings = StorageProxyConfig {
/// url: Url::parse("http://example.com").unwrap(),
Expand All @@ -51,7 +62,7 @@ mod storage_proxy_connection;
/// min_batch_interval_start: 259_200,
/// max_batch_interval_end: 259_200,
/// supported_hpke_kems: vec![HpkeKemId::X25519HkdfSha256],
/// allow_taskprov: true,
/// allow_taskprov: false,
/// default_num_agg_span_shards: NonZeroUsize::new(2).unwrap(),
/// };
/// let service_config = DaphneServiceConfig {
Expand Down Expand Up @@ -92,6 +103,7 @@ pub struct StorageProxyConfig {
pub auth_token: BearerToken,
}

#[axum::async_trait]
impl router::DaphneService for App {
fn server_metrics(&self) -> &dyn DaphneServiceMetrics {
&*self.metrics
Expand All @@ -100,6 +112,72 @@ impl router::DaphneService for App {
fn signing_key(&self) -> Option<&p256::ecdsa::SigningKey> {
self.service_config.signing_key.as_ref()
}

async fn check_bearer_token(
&self,
presented_token: &BearerToken,
sender: DapSender,
task_id: TaskId,
is_taskprov: bool,
) -> Result<(), Either<String, DapError>> {
let reject = |extra_args| {
Err(Left(format!(
"the indicated bearer token is incorrect for the {sender:?} {extra_args}",
)))
};
if let Some(taskprov) = self
.service_config
.taskprov
.as_ref()
// we only use taskprov auth if it's allowed by config and if the request is using taskprov
.filter(|_| self.service_config.global.allow_taskprov && is_taskprov)
{
match (&taskprov.peer_auth, sender) {
(PeerBearerToken::Leader { expected_token }, DapSender::Leader)
| (PeerBearerToken::Collector { expected_token }, DapSender::Collector)
if expected_token == presented_token =>
{
Ok(())
}
(PeerBearerToken::Leader { .. }, DapSender::Collector) => Err(Right(fatal_error!(
err = "expected a leader sender but got a collector sender"
))),
(PeerBearerToken::Collector { .. }, DapSender::Leader) => Err(Right(fatal_error!(
err = "expected a collector sender but got a leader sender"
))),
_ => reject(format_args!("using taskprov")),
}
} else if self
.bearer_tokens()
.matches(sender, task_id, presented_token)
.await
.map_err(|e| {
Right(fatal_error!(
err = ?e,
"internal error occurred while running authentication"
))
})?
{
Ok(())
} else {
reject(format_args!("with task_id {}", task_id.to_base64url()))
}
}

async fn is_using_taskprov(&self, req: &daphne::DapRequest) -> Result<bool, DapError> {
if req.taskprov.is_some() {
Ok(true)
} else if self
.get_task_config_for(&req.task_id.unwrap())
.await?
.is_some_and(|task_config| task_config.method_is_taskprov())
{
tracing::warn!("Client referencing a taskprov task id without taskprov advertisement");
Ok(true)
} else {
Ok(false)
}
}
}

impl App {
Expand Down Expand Up @@ -137,4 +215,8 @@ impl App {
pub(crate) fn kv(&self) -> Kv<'_> {
Kv::new(&self.storage_proxy_config, &self.http, &self.cache)
}

pub(crate) fn bearer_tokens(&self) -> BearerTokens<'_> {
BearerTokens::from(Kv::new(&self.storage_proxy_config, &self.http, &self.cache))
}
}
115 changes: 5 additions & 110 deletions crates/daphne-server/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use std::{borrow::Cow, future::ready, num::NonZeroUsize, ops::Range, time::SystemTime};
use std::{future::ready, num::NonZeroUsize, ops::Range, time::SystemTime};

use axum::async_trait;
use daphne::{
audit_log::AuditLog,
auth::{BearerToken, BearerTokenProvider},
error::DapAbort,
fatal_error,
hpke::{HpkeConfig, HpkeDecrypter, HpkeProvider},
Expand All @@ -16,11 +15,8 @@ use daphne::{
taskprov, DapAggregateShare, DapAggregateSpan, DapAggregationParam, DapError, DapGlobalConfig,
DapRequest, DapTaskConfig, DapVersion, EarlyReportStateConsumed, EarlyReportStateInitialized,
};
use daphne_service_utils::{
auth::DaphneAuth,
durable_requests::bindings::{
self, AggregateStoreMergeOptions, AggregateStoreMergeReq, AggregateStoreMergeResp,
},
use daphne_service_utils::durable_requests::bindings::{
self, AggregateStoreMergeOptions, AggregateStoreMergeReq, AggregateStoreMergeResp,
};
use futures::{future::try_join_all, StreamExt, TryStreamExt};
use mappable_rc::Marc;
Expand All @@ -32,7 +28,7 @@ use crate::{
};

#[async_trait]
impl DapAggregator<DaphneAuth> for crate::App {
impl DapAggregator for crate::App {
#[tracing::instrument(skip(self, task_config, agg_share_span))]
async fn try_put_agg_share_span(
&self,
Expand Down Expand Up @@ -151,54 +147,6 @@ impl DapAggregator<DaphneAuth> for crate::App {
where
Self: 'a;

async fn unauthorized_reason(
&self,
task_config: &DapTaskConfig,
req: &DapRequest<DaphneAuth>,
) -> Result<Option<String>, DapError> {
let mut authorized = false;

let Some(ref sender_auth) = req.sender_auth else {
return Ok(Some("Missing authorization.".into()));
};

// If a bearer token is present, verify that it can be used to authorize the request.
if sender_auth.bearer_token.is_some() {
if let Some(unauthorized_reason) =
self.bearer_token_authorized(task_config, req).await?
{
return Ok(Some(unauthorized_reason));
}
authorized = true;
}

// If a TLS client certificate is present verify that it is valid.
if let Some(ref cf_tls_client_auth) = sender_auth.cf_tls_client_auth {
// TODO(cjpatton) Add support for TLS client authentication for non-Taskprov tasks.
let Some(ref _taskprov_config) = self.service_config.taskprov else {
return Ok(Some(
"TLS client authentication is currently only supported with Taskprov.".into(),
));
};

// Check that that the certificate is valid. This is indicated by literal "SUCCESS".
if cf_tls_client_auth.verified != "SUCCESS" {
return Ok(Some(format!(
"Invalid TLS certificate ({}).",
cf_tls_client_auth.verified
)));
}

authorized = true;
}

if authorized {
Ok(None)
} else {
Ok(Some("No suitable authorization method was found.".into()))
}
}

async fn get_global_config(&self) -> Result<DapGlobalConfig, DapError> {
let mut global_config = self.service_config.global.clone();

Expand Down Expand Up @@ -278,7 +226,7 @@ impl DapAggregator<DaphneAuth> for crate::App {

async fn taskprov_put(
&self,
req: &DapRequest<DaphneAuth>,
req: &DapRequest,
task_config: DapTaskConfig,
) -> Result<(), DapError> {
let task_id = req.task_id().map_err(DapError::Abort)?;
Expand Down Expand Up @@ -524,56 +472,3 @@ impl HpkeDecrypter for crate::App {
.ok_or(DapError::Transition(TransitionFailure::HpkeUnknownConfigId))?
}
}

#[async_trait]
impl BearerTokenProvider for crate::App {
type WrappedBearerToken<'a> = Cow<'a, BearerToken>
where Self: 'a;

async fn get_leader_bearer_token_for<'s>(
&'s self,
task_id: &'s TaskId,
task_config: &DapTaskConfig,
) -> std::result::Result<Option<Self::WrappedBearerToken<'s>>, DapError> {
if self.service_config.global.allow_taskprov && task_config.method_is_taskprov() {
if let Some(bearer_token) = self
.service_config
.taskprov
.as_ref()
.and_then(|c| c.leader_auth.bearer_token.as_ref())
{
return Ok(Some(Cow::Borrowed(bearer_token)));
}
}

self.kv()
.get_cloned::<kv::prefix::LeaderBearerToken>(task_id, &KvGetOptions::default())
.await
.map_err(|e| fatal_error!(err = ?e, "failed to get the leader bearer token"))
.map(|r| r.map(Cow::Owned))
}

async fn get_collector_bearer_token_for<'s>(
&'s self,
task_id: &'s TaskId,
task_config: &DapTaskConfig,
) -> std::result::Result<Option<Self::WrappedBearerToken<'s>>, DapError> {
if self.service_config.global.allow_taskprov && task_config.method_is_taskprov() {
if let Some(bearer_token) = self.service_config.taskprov.as_ref().and_then(|c| {
c.collector_auth
.as_ref()
.expect("collector auth method not set")
.bearer_token
.as_ref()
}) {
return Ok(Some(Cow::Borrowed(bearer_token)));
}
}

self.kv()
.get_cloned::<kv::prefix::CollectorBearerToken>(task_id, &KvGetOptions::default())
.await
.map_err(|e| fatal_error!(err = ?e, "failed to get the collector bearer token"))
.map(|r| r.map(Cow::Owned))
}
}
Loading

0 comments on commit d998220

Please sign in to comment.