Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IoT Config locked route updates and shutdown listeners #567

Merged
merged 3 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions iot_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct GatewayService {
region_map: RegionMapReader,
signing_key: Arc<Keypair>,
delegate_cache: watch::Receiver<org::DelegateCache>,
shutdown: triggered::Listener,
}

impl GatewayService {
Expand All @@ -44,6 +45,7 @@ impl GatewayService {
region_map: RegionMapReader,
auth_cache: AuthCache,
delegate_cache: watch::Receiver<org::DelegateCache>,
shutdown: triggered::Listener,
) -> Result<Self> {
let gateway_cache = Arc::new(Cache::new());
let cache_clone = gateway_cache.clone();
Expand All @@ -56,6 +58,7 @@ impl GatewayService {
region_map,
signing_key: Arc::new(settings.signing_keypair()?),
delegate_cache,
shutdown,
})
}

Expand Down Expand Up @@ -275,18 +278,21 @@ impl iot_config::Gateway for GatewayService {
let signing_key = self.signing_key.clone();
let batch_size = request.batch_size;
let region_map = self.region_map.clone();
let shutdown_listener = self.shutdown.clone();

let (tx, rx) = tokio::sync::mpsc::channel(20);

tokio::spawn(async move {
stream_all_gateways_info(
&pool,
tx.clone(),
&signing_key,
region_map.clone(),
batch_size,
)
.await
tokio::select! {
_ = shutdown_listener => (),
_ = stream_all_gateways_info(
&pool,
tx.clone(),
&signing_key,
region_map.clone(),
batch_size,
) => (),
}
});

Ok(Response::new(GrpcStreamResult::new(rx)))
Expand All @@ -313,24 +319,24 @@ async fn stream_all_gateways_info(
})
.collect();

let mut response = GatewayInfoStreamResV1 {
let mut gateway = GatewayInfoStreamResV1 {
gateways: gateway_infos,
timestamp,
signer: signer.clone(),
signature: vec![],
};

response = match signing_key.sign(&response.encode_to_vec()) {
gateway = match signing_key.sign(&gateway.encode_to_vec()) {
Ok(signature) => GatewayInfoStreamResV1 {
signature,
..response
..gateway
},
Err(_) => {
continue;
}
};

tx.send(Ok(response)).await?;
tx.send(Ok(gateway)).await?;
}
Ok(())
}
7 changes: 5 additions & 2 deletions iot_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ pub fn update_channel<T: Clone>() -> broadcast::Sender<T> {
update_tx
}

pub async fn broadcast_update<T>(
pub async fn broadcast_update<T: std::fmt::Debug>(
message: T,
sender: broadcast::Sender<T>,
) -> Result<(), broadcast::error::SendError<T>> {
while !enqueue_update(sender.len()) {
tokio::time::sleep(tokio::time::Duration::from_millis(25)).await
}
sender.send(message).map(|_| ())
sender.send(message).map(|_| ()).map_err(|err| {
tracing::error!(error = ?err, "failed to broadcast routing update");
err
})
}

fn enqueue_update(queue_size: usize) -> bool {
Expand Down
2 changes: 2 additions & 0 deletions iot_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl Daemon {
region_map.clone(),
auth_cache.clone(),
delegate_key_cache,
shutdown_listener.clone(),
)?;
let route_svc = RouteService::new(
settings,
Expand All @@ -108,6 +109,7 @@ impl Daemon {
pool.clone(),
route_svc.clone_update_channel(),
delegate_key_updater,
shutdown_listener.clone(),
)?;
let admin_svc = AdminService::new(
settings,
Expand Down
107 changes: 42 additions & 65 deletions iot_config/src/org_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
admin::{AuthCache, KeyType},
helium_netids, lora_field, org,
broadcast_update, helium_netids, lora_field, org,
route::list_routes,
telemetry, verify_public_key, GrpcResult, Settings,
};
Expand All @@ -26,6 +26,7 @@ pub struct OrgService {
route_update_tx: broadcast::Sender<RouteStreamResV1>,
signing_key: Keypair,
delegate_updater: watch::Sender<org::DelegateCache>,
shutdown: triggered::Listener,
}

#[derive(Clone, Debug, PartialEq)]
Expand All @@ -41,13 +42,15 @@ impl OrgService {
pool: Pool<Postgres>,
route_update_tx: broadcast::Sender<RouteStreamResV1>,
delegate_updater: watch::Sender<org::DelegateCache>,
shutdown: triggered::Listener,
) -> Result<Self> {
Ok(Self {
auth_cache,
pool,
route_update_tx,
signing_key: settings.signing_keypair()?,
delegate_updater,
shutdown,
})
}

Expand Down Expand Up @@ -111,6 +114,38 @@ impl OrgService {
.sign(response)
.map_err(|_| Status::internal("response signing error"))
}

async fn stream_org_routes_enable_disable(&self, oui: u64) -> Result<(), Status> {
let routes = list_routes(oui, &self.pool).await.map_err(|err| {
tracing::error!(org = oui, reason = ?err, "failed to list org routes for streaming update");
Status::internal(format!("error retrieving routes for updated org: {}", oui))
})?;
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in routes {
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
data: Some(route_stream_res_v1::Data::Route(route.into())),
timestamp,
signer: signer.clone(),
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if broadcast_update(update, self.route_update_tx.clone())
.await
.is_err()
{
tracing::info!(
route_id,
"all subscribers disconnected; org routes update incomplete"
);
break;
};
tracing::debug!(route_id, "route updated");
}
Ok(())
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -431,38 +466,9 @@ impl iot_config::Org for OrgService {
Status::internal(format!("org disable failed for: {}", request.oui))
})?;

let org_routes = list_routes(request.oui, &self.pool).await.map_err(|err| {
tracing::error!(
org = request.oui,
reason = ?err,
"failed to list org routes for streaming disable update"
);
Status::internal(format!(
"error retrieving routes for disabled org: {}",
request.oui
))
})?;

let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in org_routes {
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
data: Some(route_stream_res_v1::Data::Route(route.into())),
timestamp,
signer: signer.clone(),
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if self.route_update_tx.send(update).is_err() {
tracing::info!(
route_id = route_id,
"all subscribers disconnected; route disable incomplete"
);
break;
};
tracing::debug!(route_id = route_id, "route disabled");
tokio::select! {
_ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")),
result = self.stream_org_routes_enable_disable(request.oui) => result?
}
}

Expand Down Expand Up @@ -499,38 +505,9 @@ impl iot_config::Org for OrgService {
Status::internal(format!("org enable failed for: {}", request.oui))
})?;

let org_routes = list_routes(request.oui, &self.pool).await.map_err(|err| {
tracing::error!(
org = request.oui,
reason = ?err,
"failed to list routes for streaming enable update"
);
Status::internal(format!(
"error retrieving routes for enabled org: {}",
request.oui
))
})?;

let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in org_routes {
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
data: Some(route_stream_res_v1::Data::Route(route.into())),
timestamp,
signer: signer.clone(),
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if self.route_update_tx.send(update).is_err() {
tracing::info!(
route_id = route_id,
"all subscribers disconnected; route enable incomplete"
);
break;
};
tracing::debug!(route_id = route_id, "route enabled");
tokio::select! {
_ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")),
result = self.stream_org_routes_enable_disable(request.oui) => result?
}
}

Expand Down
59 changes: 30 additions & 29 deletions iot_config/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,28 +137,28 @@ pub async fn create_route(

transaction.commit().await?;

if new_route.active && !new_route.locked {
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
new_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};
_ = signing_key
.sign(&update.encode_to_vec())
.map_err(|err| tracing::error!("error signing route stream response: {err:?}"))
.and_then(|signature| {
update.signature = signature;
update_tx.send(update).map_err(|err| {
tracing::warn!("error broadcasting route stream response: {err:?}")
})
});
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
new_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};
_ = futures::future::ready(signing_key.sign(&update.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route create");
anyhow!("error signing route create")
})
.and_then(|signature| {
update.signature = signature;
broadcast_update(update, update_tx)
.map_err(|_| anyhow!("failed broadcasting route create"))
})
.await;

Ok(new_route)
}
Expand Down Expand Up @@ -213,15 +213,17 @@ pub async fn update_route(
signature: vec![],
};

_ = signing_key
.sign(&update_res.encode_to_vec())
.map_err(|err| tracing::error!("error signing route stream response: {err:?}"))
_ = futures::future::ready(signing_key.sign(&update_res.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route update");
anyhow!("error signing route update")
})
.and_then(|signature| {
update_res.signature = signature;
update_tx
.send(update_res)
.map_err(|err| tracing::warn!("error broadcasting route stream response: {err:?}"))
});
broadcast_update(update_res, update_tx)
.map_err(|_| anyhow!("failed broadcasting route update"))
})
.await;

Ok(updated_route)
}
Expand Down Expand Up @@ -523,7 +525,6 @@ pub fn active_route_stream<'a>(
select r.id, r.oui, r.net_id, r.max_copies, r.server_host, r.server_port, r.server_protocol_opts, r.active, r.ignore_empty_skf, o.locked
from routes r
join organizations o on r.oui = o.oui
where o.locked = false and r.active = true
group by r.id, o.locked
"#,
)
Expand Down
Loading