From 6a957512e9c2afafaf457b805124d0f2debc96ad Mon Sep 17 00:00:00 2001 From: Rafael RL Date: Thu, 8 Feb 2024 13:47:29 +0100 Subject: [PATCH 1/3] Add continuous subscription to subscribe method --- kuksa_databroker/databroker/src/broker.rs | 242 ++++++++++++++++-- .../databroker/src/grpc/kuksa_val_v1/val.rs | 8 +- .../databroker/src/viss/v2/server.rs | 11 +- 3 files changed, 238 insertions(+), 23 deletions(-) diff --git a/kuksa_databroker/databroker/src/broker.rs b/kuksa_databroker/databroker/src/broker.rs index cb3071c3..ebeff474 100644 --- a/kuksa_databroker/databroker/src/broker.rs +++ b/kuksa_databroker/databroker/src/broker.rs @@ -25,7 +25,8 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; -use std::time::SystemTime; +use std::sync::Mutex; +use std::time::{Duration, SystemTime}; use crate::query::{CompiledQuery, ExecutionInput}; use crate::types::ExecutionInputImplData; @@ -90,7 +91,7 @@ pub enum Field { MetadataUnit, } -#[derive(Default)] +#[derive(Default, Debug)] pub struct Database { next_id: AtomicI32, path_to_id: HashMap, @@ -101,6 +102,7 @@ pub struct Database { pub struct Subscriptions { query_subscriptions: Vec, change_subscriptions: Vec, + continuous_subscriptions: Arc>>, } #[derive(Debug, Clone)] @@ -158,6 +160,14 @@ pub struct ChangeSubscription { permissions: Permissions, } +#[derive(Debug, Clone)] +pub struct ContinuousSubscription { + entries: HashMap>, + sender: mpsc::Sender, + permissions: Permissions, + database: Arc>, +} + #[derive(Debug)] pub struct NotificationError {} @@ -605,6 +615,23 @@ impl Subscriptions { self.change_subscriptions.push(subscription) } + pub fn add_continuous_subscription(&mut self, subscription: ContinuousSubscription) { + let local_subscription = subscription.clone(); + self.continuous_subscriptions + .lock() + .unwrap() + .push(subscription); + + tokio::spawn(async move { + // Asynchronous code to be executed in the new task + while !local_subscription.sender.is_closed() { + let _ = local_subscription.notify(None).await; + // Simulate some asynchronous work + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); + } + pub async fn notify( &self, changed: Option<&HashMap>>, @@ -648,6 +675,7 @@ impl Subscriptions { pub fn clear(&mut self) { self.query_subscriptions.clear(); self.change_subscriptions.clear(); + self.continuous_subscriptions.lock().unwrap().clear(); } pub fn cleanup(&mut self) { @@ -667,6 +695,14 @@ impl Subscriptions { true } }); + self.continuous_subscriptions.lock().unwrap().retain(|sub| { + if sub.sender.is_closed() { + info!("Subscriber gone: removing continuous subscription"); + false + } else { + true + } + }); } } @@ -921,6 +957,117 @@ impl QuerySubscription { } } +impl ContinuousSubscription { + async fn notify( + &self, + changed: Option<&HashMap>>, + ) -> Result<(), NotificationError> { + let db = self.database.read().await; + let db_read = db.authorized_read_access(&self.permissions); + match changed { + Some(changed) => { + let mut matches = false; + for (id, changed_fields) in changed { + if let Some(fields) = self.entries.get(id) { + if !fields.is_disjoint(changed_fields) { + matches = true; + break; + } + } + } + if matches { + // notify + let notifications = { + let mut notifications = EntryUpdates::default(); + + for (id, changed_fields) in changed { + if let Some(fields) = self.entries.get(id) { + if !fields.is_disjoint(changed_fields) { + match db_read.get_entry_by_id(*id) { + Ok(entry) => { + let mut update = EntryUpdate::default(); + let mut notify_fields = HashSet::new(); + // TODO: Perhaps make path optional + update.path = Some(entry.metadata.path.clone()); + if changed_fields.contains(&Field::Datapoint) + && fields.contains(&Field::Datapoint) + { + update.datapoint = Some(entry.datapoint.clone()); + notify_fields.insert(Field::Datapoint); + } + if changed_fields.contains(&Field::ActuatorTarget) + && fields.contains(&Field::ActuatorTarget) + { + update.actuator_target = + Some(entry.actuator_target.clone()); + notify_fields.insert(Field::ActuatorTarget); + } + notifications.updates.push(ChangeNotification { + update, + fields: notify_fields, + }); + } + Err(_) => { + debug!("notify: could not find entry with id {}", id) + } + } + } + } + } + notifications + }; + if notifications.updates.is_empty() { + Ok(()) + } else { + match self.sender.send(notifications).await { + Ok(()) => Ok(()), + Err(_) => Err(NotificationError {}), + } + } + } else { + Ok(()) + } + } + None => { + let notifications = { + let mut notifications = EntryUpdates::default(); + + for (id, fields) in &self.entries { + match db_read.get_entry_by_id(*id) { + Ok(entry) => { + let mut update = EntryUpdate::default(); + let mut notify_fields = HashSet::new(); + // TODO: Perhaps make path optional + update.path = Some(entry.metadata.path.clone()); + if fields.contains(&Field::Datapoint) { + update.datapoint = Some(entry.datapoint.clone()); + notify_fields.insert(Field::Datapoint); + } + if fields.contains(&Field::ActuatorTarget) { + update.actuator_target = Some(entry.actuator_target.clone()); + notify_fields.insert(Field::ActuatorTarget); + } + notifications.updates.push(ChangeNotification { + update, + fields: notify_fields, + }); + } + Err(_) => { + debug!("notify: could not find entry with id {}", id) + } + } + } + notifications + }; + match self.sender.send(notifications).await { + Ok(()) => Ok(()), + Err(_) => Err(NotificationError {}), + } + } + } + } +} + pub struct DatabaseReadAccess<'a, 'b> { db: &'a Database, permissions: &'b Permissions, @@ -1445,32 +1592,78 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { pub async fn subscribe( &self, - valid_entries: HashMap>, + valid_entries: HashMap, types::ChangeType)>, ) -> Result, SubscriptionError> { if valid_entries.is_empty() { return Err(SubscriptionError::InvalidInput); } + let mut entries_on_changed: HashMap> = HashMap::new(); + let mut entries_continuous: HashMap> = HashMap::new(); + + for (id, (fields, change_type)) in valid_entries { + match change_type { + types::ChangeType::OnChange => { + entries_on_changed + .entry(id) + .and_modify(|existing_fields| existing_fields.extend(fields.clone())) + .or_insert(fields.clone()); + } + types::ChangeType::Continuous => { + entries_continuous + .entry(id) + .and_modify(|existing_fields| existing_fields.extend(fields.clone())) + .or_insert(fields.clone()); + } + types::ChangeType::Static => {} + } + } + let (sender, receiver) = mpsc::channel(10); - let subscription = ChangeSubscription { - entries: valid_entries, - sender, - permissions: self.permissions.clone(), - }; + if !entries_on_changed.is_empty() { + let subscription = ChangeSubscription { + entries: entries_on_changed, + sender: sender.clone(), + permissions: self.permissions.clone(), + }; - { - // Send everything subscribed to in an initial notification - let db = self.broker.database.read().await; - if subscription.notify(None, &db).await.is_err() { - warn!("Failed to create initial notification"); + { + // Send everything subscribed to in an initial notification + let db = self.broker.database.read().await; + if subscription.notify(None, &db).await.is_err() { + warn!("Failed to create initial notification"); + } } + + self.broker + .subscriptions + .write() + .await + .add_change_subscription(subscription); } - self.broker - .subscriptions - .write() - .await - .add_change_subscription(subscription); + if !entries_continuous.is_empty() { + let subscription_continuous = ContinuousSubscription { + entries: entries_continuous, + sender, + permissions: self.permissions.clone(), + database: Arc::clone(&self.broker.database), + }; + + { + // Send everything subscribed to in an initial notification + //let db = self.broker.database.read().await; + if subscription_continuous.notify(None).await.is_err() { + warn!("Failed to create initial notification"); + } + } + + self.broker + .subscriptions + .write() + .await + .add_continuous_subscription(subscription_continuous); + } let stream = ReceiverStream::new(receiver); Ok(stream) @@ -2952,8 +3145,19 @@ mod tests { .await .expect("Register datapoint should succeed"); + let my_hashmap: HashMap, types::ChangeType)> = [( + id1, + ( + HashSet::from([Field::Datapoint]), + types::ChangeType::OnChange, + ), + )] + .iter() + .cloned() + .collect(); + let mut stream = broker - .subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))])) + .subscribe(my_hashmap) .await .expect("subscription should succeed"); diff --git a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs index 72a8fa87..249b4d1f 100644 --- a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs +++ b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs @@ -28,6 +28,7 @@ use crate::broker::ReadError; use crate::broker::SubscriptionError; use crate::glob; use crate::permissions::Permissions; +use crate::types; #[tonic::async_trait] impl proto::val_server::Val for broker::DataBroker { @@ -409,7 +410,7 @@ impl proto::val_server::Val for broker::DataBroker { } } - let mut entries: HashMap> = HashMap::new(); + let mut entries: HashMap, types::ChangeType)> = HashMap::new(); if !valid_requests.is_empty() { for (path, (regex, fields)) in valid_requests { @@ -423,9 +424,10 @@ impl proto::val_server::Val for broker::DataBroker { entries .entry(entry.metadata().id) .and_modify(|existing_fields| { - existing_fields.extend(fields.clone()); + existing_fields.0.extend(fields.clone()); + existing_fields.1 = entry.metadata().change_type.clone(); }) - .or_insert(fields.clone()); + .or_insert((fields.clone(), entry.metadata().change_type.clone())); match entry.datapoint() { Ok(_) => {} diff --git a/kuksa_databroker/databroker/src/viss/v2/server.rs b/kuksa_databroker/databroker/src/viss/v2/server.rs index 01f96fc3..634b2746 100644 --- a/kuksa_databroker/databroker/src/viss/v2/server.rs +++ b/kuksa_databroker/databroker/src/viss/v2/server.rs @@ -33,6 +33,7 @@ use crate::{ }; use super::{conversions, types::*}; +pub use crate::types::ChangeType; #[tonic::async_trait] pub(crate) trait Viss: Send + Sync + 'static { @@ -253,7 +254,15 @@ impl Viss for Server { let Some(entries) = broker .get_id_by_path(request.path.as_ref()) .await - .map(|id| HashMap::from([(id, HashSet::from([broker::Field::Datapoint]))])) + .map(|id| { + HashMap::from([( + id, + ( + HashSet::from([broker::Field::Datapoint]), + ChangeType::Static, + ), + )]) + }) else { return Err(SubscribeErrorResponse { request_id, From 10592bb43c76ac04ed1f1f31b7e6e4b826071aa9 Mon Sep 17 00:00:00 2001 From: Rafael RL Date: Tue, 13 Feb 2024 12:08:00 +0100 Subject: [PATCH 2/3] Keep original subscribe signature --- kuksa_databroker/databroker/src/broker.rs | 56 ++++++++++--------- .../databroker/src/grpc/kuksa_val_v1/val.rs | 8 +-- .../databroker/src/viss/v2/server.rs | 11 +--- 3 files changed, 34 insertions(+), 41 deletions(-) diff --git a/kuksa_databroker/databroker/src/broker.rs b/kuksa_databroker/databroker/src/broker.rs index ebeff474..4d69ef61 100644 --- a/kuksa_databroker/databroker/src/broker.rs +++ b/kuksa_databroker/databroker/src/broker.rs @@ -1592,7 +1592,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { pub async fn subscribe( &self, - valid_entries: HashMap, types::ChangeType)>, + valid_entries: HashMap>, ) -> Result, SubscriptionError> { if valid_entries.is_empty() { return Err(SubscriptionError::InvalidInput); @@ -1601,21 +1601,36 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let mut entries_on_changed: HashMap> = HashMap::new(); let mut entries_continuous: HashMap> = HashMap::new(); - for (id, (fields, change_type)) in valid_entries { - match change_type { - types::ChangeType::OnChange => { - entries_on_changed - .entry(id) - .and_modify(|existing_fields| existing_fields.extend(fields.clone())) - .or_insert(fields.clone()); + let db_read = self.broker.database.read().await; + let db_read_access = db_read.authorized_read_access(self.permissions); + + for (id, fields) in valid_entries { + match db_read_access.get_entry_by_id(id) { + Ok(entry) => { + let change_type = entry.metadata.change_type.clone(); + match change_type { + types::ChangeType::OnChange => { + entries_on_changed + .entry(id) + .and_modify(|existing_fields| { + existing_fields.extend(fields.clone()) + }) + .or_insert(fields.clone()); + } + types::ChangeType::Continuous => { + entries_continuous + .entry(id) + .and_modify(|existing_fields| { + existing_fields.extend(fields.clone()) + }) + .or_insert(fields.clone()); + } + types::ChangeType::Static => {} + } } - types::ChangeType::Continuous => { - entries_continuous - .entry(id) - .and_modify(|existing_fields| existing_fields.extend(fields.clone())) - .or_insert(fields.clone()); + Err(_) => { + debug!("notify: could not find entry with id {}", id) } - types::ChangeType::Static => {} } } @@ -3145,19 +3160,8 @@ mod tests { .await .expect("Register datapoint should succeed"); - let my_hashmap: HashMap, types::ChangeType)> = [( - id1, - ( - HashSet::from([Field::Datapoint]), - types::ChangeType::OnChange, - ), - )] - .iter() - .cloned() - .collect(); - let mut stream = broker - .subscribe(my_hashmap) + .subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))])) .await .expect("subscription should succeed"); diff --git a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs index 249b4d1f..72a8fa87 100644 --- a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs +++ b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs @@ -28,7 +28,6 @@ use crate::broker::ReadError; use crate::broker::SubscriptionError; use crate::glob; use crate::permissions::Permissions; -use crate::types; #[tonic::async_trait] impl proto::val_server::Val for broker::DataBroker { @@ -410,7 +409,7 @@ impl proto::val_server::Val for broker::DataBroker { } } - let mut entries: HashMap, types::ChangeType)> = HashMap::new(); + let mut entries: HashMap> = HashMap::new(); if !valid_requests.is_empty() { for (path, (regex, fields)) in valid_requests { @@ -424,10 +423,9 @@ impl proto::val_server::Val for broker::DataBroker { entries .entry(entry.metadata().id) .and_modify(|existing_fields| { - existing_fields.0.extend(fields.clone()); - existing_fields.1 = entry.metadata().change_type.clone(); + existing_fields.extend(fields.clone()); }) - .or_insert((fields.clone(), entry.metadata().change_type.clone())); + .or_insert(fields.clone()); match entry.datapoint() { Ok(_) => {} diff --git a/kuksa_databroker/databroker/src/viss/v2/server.rs b/kuksa_databroker/databroker/src/viss/v2/server.rs index 634b2746..01f96fc3 100644 --- a/kuksa_databroker/databroker/src/viss/v2/server.rs +++ b/kuksa_databroker/databroker/src/viss/v2/server.rs @@ -33,7 +33,6 @@ use crate::{ }; use super::{conversions, types::*}; -pub use crate::types::ChangeType; #[tonic::async_trait] pub(crate) trait Viss: Send + Sync + 'static { @@ -254,15 +253,7 @@ impl Viss for Server { let Some(entries) = broker .get_id_by_path(request.path.as_ref()) .await - .map(|id| { - HashMap::from([( - id, - ( - HashSet::from([broker::Field::Datapoint]), - ChangeType::Static, - ), - )]) - }) + .map(|id| HashMap::from([(id, HashSet::from([broker::Field::Datapoint]))])) else { return Err(SubscribeErrorResponse { request_id, From 960e0574468802d5c4db1dfa19731e9a1326e8b1 Mon Sep 17 00:00:00 2001 From: ruz4fe Date: Wed, 6 Mar 2024 15:48:36 +0100 Subject: [PATCH 3/3] Continuous subscription at a specific frequency --- kuksa_databroker/databroker/src/broker.rs | 22 +++++++++++++++---- .../databroker/src/grpc/kuksa_val_v1/val.rs | 2 +- .../databroker/src/viss/v2/server.rs | 2 +- kuksa_databroker/lib/kuksa/src/lib.rs | 15 ++++++++++--- proto/kuksa/val/v1/val.proto | 3 ++- 5 files changed, 34 insertions(+), 10 deletions(-) diff --git a/kuksa_databroker/databroker/src/broker.rs b/kuksa_databroker/databroker/src/broker.rs index 4d69ef61..7505010c 100644 --- a/kuksa_databroker/databroker/src/broker.rs +++ b/kuksa_databroker/databroker/src/broker.rs @@ -615,7 +615,11 @@ impl Subscriptions { self.change_subscriptions.push(subscription) } - pub fn add_continuous_subscription(&mut self, subscription: ContinuousSubscription) { + pub fn add_continuous_subscription( + &mut self, + subscription: ContinuousSubscription, + frequency: u64, + ) { let local_subscription = subscription.clone(); self.continuous_subscriptions .lock() @@ -627,7 +631,7 @@ impl Subscriptions { while !local_subscription.sender.is_closed() { let _ = local_subscription.notify(None).await; // Simulate some asynchronous work - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(1 / frequency * 1000)).await; } }); } @@ -1593,6 +1597,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { pub async fn subscribe( &self, valid_entries: HashMap>, + frequency: Option, ) -> Result, SubscriptionError> { if valid_entries.is_empty() { return Err(SubscriptionError::InvalidInput); @@ -1636,6 +1641,9 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let (sender, receiver) = mpsc::channel(10); if !entries_on_changed.is_empty() { + if frequency.is_some() && entries_continuous.is_empty() { + return Err(SubscriptionError::InvalidInput); + } let subscription = ChangeSubscription { entries: entries_on_changed, sender: sender.clone(), @@ -1658,6 +1666,9 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { } if !entries_continuous.is_empty() { + if frequency.is_none() { + return Err(SubscriptionError::InvalidInput); + } let subscription_continuous = ContinuousSubscription { entries: entries_continuous, sender, @@ -1677,7 +1688,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .subscriptions .write() .await - .add_continuous_subscription(subscription_continuous); + .add_continuous_subscription(subscription_continuous, frequency.unwrap()); } let stream = ReceiverStream::new(receiver); @@ -3161,7 +3172,10 @@ mod tests { .expect("Register datapoint should succeed"); let mut stream = broker - .subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))])) + .subscribe( + HashMap::from([(id1, HashSet::from([Field::Datapoint]))]), + None, + ) .await .expect("subscription should succeed"); diff --git a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs index 72a8fa87..680eb0b1 100644 --- a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs +++ b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs @@ -445,7 +445,7 @@ impl proto::val_server::Val for broker::DataBroker { } } - match broker.subscribe(entries).await { + match broker.subscribe(entries, request.frequency_hertz).await { Ok(stream) => { let stream = convert_to_proto_stream(stream); Ok(tonic::Response::new(Box::pin(stream))) diff --git a/kuksa_databroker/databroker/src/viss/v2/server.rs b/kuksa_databroker/databroker/src/viss/v2/server.rs index 01f96fc3..eda4e910 100644 --- a/kuksa_databroker/databroker/src/viss/v2/server.rs +++ b/kuksa_databroker/databroker/src/viss/v2/server.rs @@ -262,7 +262,7 @@ impl Viss for Server { }); }; - match broker.subscribe(entries).await { + match broker.subscribe(entries, None).await { Ok(stream) => { let subscription_id = SubscriptionId::new(); diff --git a/kuksa_databroker/lib/kuksa/src/lib.rs b/kuksa_databroker/lib/kuksa/src/lib.rs index a2beec62..f02db184 100644 --- a/kuksa_databroker/lib/kuksa/src/lib.rs +++ b/kuksa_databroker/lib/kuksa/src/lib.rs @@ -288,7 +288,10 @@ impl KuksaClient { }) } - let req = proto::v1::SubscribeRequest { entries }; + let req = proto::v1::SubscribeRequest { + entries, + frequency_hertz: None, + }; match client.subscribe(req).await { Ok(response) => Ok(response.into_inner()), @@ -321,7 +324,10 @@ impl KuksaClient { }) } - let req = proto::v1::SubscribeRequest { entries }; + let req = proto::v1::SubscribeRequest { + entries, + frequency_hertz: None, + }; match client.subscribe(req).await { Ok(response) => Ok(response.into_inner()), @@ -346,7 +352,10 @@ impl KuksaClient { }) } - let req = proto::v1::SubscribeRequest { entries }; + let req = proto::v1::SubscribeRequest { + entries, + frequency_hertz: None, + }; match client.subscribe(req).await { Ok(response) => Ok(response.into_inner()), diff --git a/proto/kuksa/val/v1/val.proto b/proto/kuksa/val/v1/val.proto index 3059d809..18452652 100644 --- a/proto/kuksa/val/v1/val.proto +++ b/proto/kuksa/val/v1/val.proto @@ -98,6 +98,7 @@ message SubscribeEntry { // Subscribe to changes in datapoints. message SubscribeRequest { repeated SubscribeEntry entries = 1; + optional uint64 frequency_hertz = 2; } // A subscription response @@ -112,4 +113,4 @@ message GetServerInfoRequest { message GetServerInfoResponse { string name = 1; string version = 2; -} \ No newline at end of file +}