diff --git a/kuksa_databroker/databroker/src/broker.rs b/kuksa_databroker/databroker/src/broker.rs index cb3071c35..7505010cc 100644 --- a/kuksa_databroker/databroker/src/broker.rs +++ b/kuksa_databroker/databroker/src/broker.rs @@ -25,7 +25,8 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; -use std::time::SystemTime; +use std::sync::Mutex; +use std::time::{Duration, SystemTime}; use crate::query::{CompiledQuery, ExecutionInput}; use crate::types::ExecutionInputImplData; @@ -90,7 +91,7 @@ pub enum Field { MetadataUnit, } -#[derive(Default)] +#[derive(Default, Debug)] pub struct Database { next_id: AtomicI32, path_to_id: HashMap, @@ -101,6 +102,7 @@ pub struct Database { pub struct Subscriptions { query_subscriptions: Vec, change_subscriptions: Vec, + continuous_subscriptions: Arc>>, } #[derive(Debug, Clone)] @@ -158,6 +160,14 @@ pub struct ChangeSubscription { permissions: Permissions, } +#[derive(Debug, Clone)] +pub struct ContinuousSubscription { + entries: HashMap>, + sender: mpsc::Sender, + permissions: Permissions, + database: Arc>, +} + #[derive(Debug)] pub struct NotificationError {} @@ -605,6 +615,27 @@ impl Subscriptions { self.change_subscriptions.push(subscription) } + pub fn add_continuous_subscription( + &mut self, + subscription: ContinuousSubscription, + frequency: u64, + ) { + let local_subscription = subscription.clone(); + self.continuous_subscriptions + .lock() + .unwrap() + .push(subscription); + + tokio::spawn(async move { + // Asynchronous code to be executed in the new task + while !local_subscription.sender.is_closed() { + let _ = local_subscription.notify(None).await; + // Simulate some asynchronous work + tokio::time::sleep(Duration::from_millis(1 / frequency * 1000)).await; + } + }); + } + pub async fn notify( &self, changed: Option<&HashMap>>, @@ -648,6 +679,7 @@ impl Subscriptions { pub fn clear(&mut self) { self.query_subscriptions.clear(); self.change_subscriptions.clear(); + self.continuous_subscriptions.lock().unwrap().clear(); } pub fn cleanup(&mut self) { @@ -667,6 +699,14 @@ impl Subscriptions { true } }); + self.continuous_subscriptions.lock().unwrap().retain(|sub| { + if sub.sender.is_closed() { + info!("Subscriber gone: removing continuous subscription"); + false + } else { + true + } + }); } } @@ -921,6 +961,117 @@ impl QuerySubscription { } } +impl ContinuousSubscription { + async fn notify( + &self, + changed: Option<&HashMap>>, + ) -> Result<(), NotificationError> { + let db = self.database.read().await; + let db_read = db.authorized_read_access(&self.permissions); + match changed { + Some(changed) => { + let mut matches = false; + for (id, changed_fields) in changed { + if let Some(fields) = self.entries.get(id) { + if !fields.is_disjoint(changed_fields) { + matches = true; + break; + } + } + } + if matches { + // notify + let notifications = { + let mut notifications = EntryUpdates::default(); + + for (id, changed_fields) in changed { + if let Some(fields) = self.entries.get(id) { + if !fields.is_disjoint(changed_fields) { + match db_read.get_entry_by_id(*id) { + Ok(entry) => { + let mut update = EntryUpdate::default(); + let mut notify_fields = HashSet::new(); + // TODO: Perhaps make path optional + update.path = Some(entry.metadata.path.clone()); + if changed_fields.contains(&Field::Datapoint) + && fields.contains(&Field::Datapoint) + { + update.datapoint = Some(entry.datapoint.clone()); + notify_fields.insert(Field::Datapoint); + } + if changed_fields.contains(&Field::ActuatorTarget) + && fields.contains(&Field::ActuatorTarget) + { + update.actuator_target = + Some(entry.actuator_target.clone()); + notify_fields.insert(Field::ActuatorTarget); + } + notifications.updates.push(ChangeNotification { + update, + fields: notify_fields, + }); + } + Err(_) => { + debug!("notify: could not find entry with id {}", id) + } + } + } + } + } + notifications + }; + if notifications.updates.is_empty() { + Ok(()) + } else { + match self.sender.send(notifications).await { + Ok(()) => Ok(()), + Err(_) => Err(NotificationError {}), + } + } + } else { + Ok(()) + } + } + None => { + let notifications = { + let mut notifications = EntryUpdates::default(); + + for (id, fields) in &self.entries { + match db_read.get_entry_by_id(*id) { + Ok(entry) => { + let mut update = EntryUpdate::default(); + let mut notify_fields = HashSet::new(); + // TODO: Perhaps make path optional + update.path = Some(entry.metadata.path.clone()); + if fields.contains(&Field::Datapoint) { + update.datapoint = Some(entry.datapoint.clone()); + notify_fields.insert(Field::Datapoint); + } + if fields.contains(&Field::ActuatorTarget) { + update.actuator_target = Some(entry.actuator_target.clone()); + notify_fields.insert(Field::ActuatorTarget); + } + notifications.updates.push(ChangeNotification { + update, + fields: notify_fields, + }); + } + Err(_) => { + debug!("notify: could not find entry with id {}", id) + } + } + } + notifications + }; + match self.sender.send(notifications).await { + Ok(()) => Ok(()), + Err(_) => Err(NotificationError {}), + } + } + } + } +} + pub struct DatabaseReadAccess<'a, 'b> { db: &'a Database, permissions: &'b Permissions, @@ -1446,31 +1597,99 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { pub async fn subscribe( &self, valid_entries: HashMap>, + frequency: Option, ) -> Result, SubscriptionError> { if valid_entries.is_empty() { return Err(SubscriptionError::InvalidInput); } + let mut entries_on_changed: HashMap> = HashMap::new(); + let mut entries_continuous: HashMap> = HashMap::new(); + + let db_read = self.broker.database.read().await; + let db_read_access = db_read.authorized_read_access(self.permissions); + + for (id, fields) in valid_entries { + match db_read_access.get_entry_by_id(id) { + Ok(entry) => { + let change_type = entry.metadata.change_type.clone(); + match change_type { + types::ChangeType::OnChange => { + entries_on_changed + .entry(id) + .and_modify(|existing_fields| { + existing_fields.extend(fields.clone()) + }) + .or_insert(fields.clone()); + } + types::ChangeType::Continuous => { + entries_continuous + .entry(id) + .and_modify(|existing_fields| { + existing_fields.extend(fields.clone()) + }) + .or_insert(fields.clone()); + } + types::ChangeType::Static => {} + } + } + Err(_) => { + debug!("notify: could not find entry with id {}", id) + } + } + } + let (sender, receiver) = mpsc::channel(10); - let subscription = ChangeSubscription { - entries: valid_entries, - sender, - permissions: self.permissions.clone(), - }; + if !entries_on_changed.is_empty() { + if frequency.is_some() && entries_continuous.is_empty() { + return Err(SubscriptionError::InvalidInput); + } + let subscription = ChangeSubscription { + entries: entries_on_changed, + sender: sender.clone(), + permissions: self.permissions.clone(), + }; - { - // Send everything subscribed to in an initial notification - let db = self.broker.database.read().await; - if subscription.notify(None, &db).await.is_err() { - warn!("Failed to create initial notification"); + { + // Send everything subscribed to in an initial notification + let db = self.broker.database.read().await; + if subscription.notify(None, &db).await.is_err() { + warn!("Failed to create initial notification"); + } } + + self.broker + .subscriptions + .write() + .await + .add_change_subscription(subscription); } - self.broker - .subscriptions - .write() - .await - .add_change_subscription(subscription); + if !entries_continuous.is_empty() { + if frequency.is_none() { + return Err(SubscriptionError::InvalidInput); + } + let subscription_continuous = ContinuousSubscription { + entries: entries_continuous, + sender, + permissions: self.permissions.clone(), + database: Arc::clone(&self.broker.database), + }; + + { + // Send everything subscribed to in an initial notification + //let db = self.broker.database.read().await; + if subscription_continuous.notify(None).await.is_err() { + warn!("Failed to create initial notification"); + } + } + + self.broker + .subscriptions + .write() + .await + .add_continuous_subscription(subscription_continuous, frequency.unwrap()); + } let stream = ReceiverStream::new(receiver); Ok(stream) @@ -2953,7 +3172,10 @@ mod tests { .expect("Register datapoint should succeed"); let mut stream = broker - .subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))])) + .subscribe( + HashMap::from([(id1, HashSet::from([Field::Datapoint]))]), + None, + ) .await .expect("subscription should succeed"); diff --git a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs index 72a8fa87a..680eb0b14 100644 --- a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs +++ b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs @@ -445,7 +445,7 @@ impl proto::val_server::Val for broker::DataBroker { } } - match broker.subscribe(entries).await { + match broker.subscribe(entries, request.frequency_hertz).await { Ok(stream) => { let stream = convert_to_proto_stream(stream); Ok(tonic::Response::new(Box::pin(stream))) diff --git a/kuksa_databroker/databroker/src/viss/v2/server.rs b/kuksa_databroker/databroker/src/viss/v2/server.rs index 01f96fc3a..eda4e910a 100644 --- a/kuksa_databroker/databroker/src/viss/v2/server.rs +++ b/kuksa_databroker/databroker/src/viss/v2/server.rs @@ -262,7 +262,7 @@ impl Viss for Server { }); }; - match broker.subscribe(entries).await { + match broker.subscribe(entries, None).await { Ok(stream) => { let subscription_id = SubscriptionId::new(); diff --git a/kuksa_databroker/lib/kuksa/src/lib.rs b/kuksa_databroker/lib/kuksa/src/lib.rs index a2beec628..f02db1847 100644 --- a/kuksa_databroker/lib/kuksa/src/lib.rs +++ b/kuksa_databroker/lib/kuksa/src/lib.rs @@ -288,7 +288,10 @@ impl KuksaClient { }) } - let req = proto::v1::SubscribeRequest { entries }; + let req = proto::v1::SubscribeRequest { + entries, + frequency_hertz: None, + }; match client.subscribe(req).await { Ok(response) => Ok(response.into_inner()), @@ -321,7 +324,10 @@ impl KuksaClient { }) } - let req = proto::v1::SubscribeRequest { entries }; + let req = proto::v1::SubscribeRequest { + entries, + frequency_hertz: None, + }; match client.subscribe(req).await { Ok(response) => Ok(response.into_inner()), @@ -346,7 +352,10 @@ impl KuksaClient { }) } - let req = proto::v1::SubscribeRequest { entries }; + let req = proto::v1::SubscribeRequest { + entries, + frequency_hertz: None, + }; match client.subscribe(req).await { Ok(response) => Ok(response.into_inner()), diff --git a/proto/kuksa/val/v1/val.proto b/proto/kuksa/val/v1/val.proto index 3059d8098..184526521 100644 --- a/proto/kuksa/val/v1/val.proto +++ b/proto/kuksa/val/v1/val.proto @@ -98,6 +98,7 @@ message SubscribeEntry { // Subscribe to changes in datapoints. message SubscribeRequest { repeated SubscribeEntry entries = 1; + optional uint64 frequency_hertz = 2; } // A subscription response @@ -112,4 +113,4 @@ message GetServerInfoRequest { message GetServerInfoResponse { string name = 1; string version = 2; -} \ No newline at end of file +}