Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add continuous subscription to subscribe method at a specific frequency #734

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 240 additions & 18 deletions kuksa_databroker/databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::time::SystemTime;
use std::sync::Mutex;
use std::time::{Duration, SystemTime};

use crate::query::{CompiledQuery, ExecutionInput};
use crate::types::ExecutionInputImplData;
Expand Down Expand Up @@ -90,7 +91,7 @@ pub enum Field {
MetadataUnit,
}

#[derive(Default)]
#[derive(Default, Debug)]
pub struct Database {
next_id: AtomicI32,
path_to_id: HashMap<String, i32>,
Expand All @@ -101,6 +102,7 @@ pub struct Database {
pub struct Subscriptions {
query_subscriptions: Vec<QuerySubscription>,
change_subscriptions: Vec<ChangeSubscription>,
continuous_subscriptions: Arc<Mutex<Vec<ContinuousSubscription>>>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -158,6 +160,14 @@ pub struct ChangeSubscription {
permissions: Permissions,
}

#[derive(Debug, Clone)]
pub struct ContinuousSubscription {
entries: HashMap<i32, HashSet<Field>>,
sender: mpsc::Sender<EntryUpdates>,
permissions: Permissions,
database: Arc<RwLock<Database>>,
}

#[derive(Debug)]
pub struct NotificationError {}

Expand Down Expand Up @@ -605,6 +615,27 @@ impl Subscriptions {
self.change_subscriptions.push(subscription)
}

pub fn add_continuous_subscription(
&mut self,
subscription: ContinuousSubscription,
frequency: u64,
) {
let local_subscription = subscription.clone();
self.continuous_subscriptions
.lock()
.unwrap()
.push(subscription);

tokio::spawn(async move {
// Asynchronous code to be executed in the new task
while !local_subscription.sender.is_closed() {
let _ = local_subscription.notify(None).await;
// Simulate some asynchronous work
tokio::time::sleep(Duration::from_millis(1 / frequency * 1000)).await;
Copy link
Contributor

@erikbosch erikbosch Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does Rust manage type conversion? from_millis expects a u64. You use 1 and 1000, will it really be a float value? Or do you rather need something like:

Duration::from_millis((1000.0/ frequency).round() as u64)

(I actually do not know what resolution/performance we can expect from the system, like if we request 1000 Hz, will we really receive signals with 1000 Hz?

I have also no experience from Rust unit-tests, here a unit test would be a good way to prove that duration becomes as expected , like that 600 Hz gives 2 ms duration

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be really interesting e.g. how fast is databroker able to notify subscribers. Does it change if we have 100 subscribers at the same time. What is the limit e.g. what Erik wrote. But I guess not that much related to this PR.

Copy link
Contributor Author

@rafaeling rafaeling Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've used your line code to fix it. I will also try to implement some unit tests to test it.
We could also open some performance report topic to measure its capabilities.

}
});
}

pub async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
Expand Down Expand Up @@ -648,6 +679,7 @@ impl Subscriptions {
pub fn clear(&mut self) {
self.query_subscriptions.clear();
self.change_subscriptions.clear();
self.continuous_subscriptions.lock().unwrap().clear();
}

pub fn cleanup(&mut self) {
Expand All @@ -667,6 +699,14 @@ impl Subscriptions {
true
}
});
self.continuous_subscriptions.lock().unwrap().retain(|sub| {
if sub.sender.is_closed() {
info!("Subscriber gone: removing continuous subscription");
false
} else {
true
}
});
}
}

Expand Down Expand Up @@ -921,6 +961,117 @@ impl QuerySubscription {
}
}

impl ContinuousSubscription {
async fn notify(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this is very similar to the ChangeSubscription notify. Couldn't it be reused somehow? What I mean is just use different subscribe function but maybe one notify? If this is not doable let me know :)

Copy link
Contributor Author

@rafaeling rafaeling Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, let me see if I can use traits -> https://doc.rust-lang.org/book/ch10-02-traits.html

&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
) -> Result<(), NotificationError> {
let db = self.database.read().await;
let db_read = db.authorized_read_access(&self.permissions);
match changed {
Some(changed) => {
let mut matches = false;
for (id, changed_fields) in changed {
if let Some(fields) = self.entries.get(id) {
if !fields.is_disjoint(changed_fields) {
matches = true;
break;
}
}
}
if matches {
// notify
let notifications = {
let mut notifications = EntryUpdates::default();

for (id, changed_fields) in changed {
if let Some(fields) = self.entries.get(id) {
if !fields.is_disjoint(changed_fields) {
match db_read.get_entry_by_id(*id) {
Ok(entry) => {
let mut update = EntryUpdate::default();
let mut notify_fields = HashSet::new();
// TODO: Perhaps make path optional
update.path = Some(entry.metadata.path.clone());
if changed_fields.contains(&Field::Datapoint)
&& fields.contains(&Field::Datapoint)
{
update.datapoint = Some(entry.datapoint.clone());
notify_fields.insert(Field::Datapoint);
}
if changed_fields.contains(&Field::ActuatorTarget)
&& fields.contains(&Field::ActuatorTarget)
{
update.actuator_target =
Some(entry.actuator_target.clone());
notify_fields.insert(Field::ActuatorTarget);
}
notifications.updates.push(ChangeNotification {
update,
fields: notify_fields,
});
}
Err(_) => {
debug!("notify: could not find entry with id {}", id)
}
}
}
}
}
notifications
};
if notifications.updates.is_empty() {
Ok(())
} else {
match self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
}
}
} else {
Ok(())
}
}
None => {
let notifications = {
let mut notifications = EntryUpdates::default();

for (id, fields) in &self.entries {
match db_read.get_entry_by_id(*id) {
Ok(entry) => {
let mut update = EntryUpdate::default();
let mut notify_fields = HashSet::new();
// TODO: Perhaps make path optional
update.path = Some(entry.metadata.path.clone());
if fields.contains(&Field::Datapoint) {
update.datapoint = Some(entry.datapoint.clone());
notify_fields.insert(Field::Datapoint);
}
if fields.contains(&Field::ActuatorTarget) {
update.actuator_target = Some(entry.actuator_target.clone());
notify_fields.insert(Field::ActuatorTarget);
}
notifications.updates.push(ChangeNotification {
update,
fields: notify_fields,
});
}
Err(_) => {
debug!("notify: could not find entry with id {}", id)
}
}
}
notifications
};
match self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
}
}
}
}
}

pub struct DatabaseReadAccess<'a, 'b> {
db: &'a Database,
permissions: &'b Permissions,
Expand Down Expand Up @@ -1446,31 +1597,99 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
frequency: Option<u64>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
if valid_entries.is_empty() {
return Err(SubscriptionError::InvalidInput);
}

let mut entries_on_changed: HashMap<i32, HashSet<Field>> = HashMap::new();
let mut entries_continuous: HashMap<i32, HashSet<Field>> = HashMap::new();

let db_read = self.broker.database.read().await;
let db_read_access = db_read.authorized_read_access(self.permissions);

for (id, fields) in valid_entries {
match db_read_access.get_entry_by_id(id) {
Ok(entry) => {
let change_type = entry.metadata.change_type.clone();
match change_type {
types::ChangeType::OnChange => {
entries_on_changed
.entry(id)
.and_modify(|existing_fields| {
existing_fields.extend(fields.clone())
})
.or_insert(fields.clone());
}
types::ChangeType::Continuous => {
entries_continuous
.entry(id)
.and_modify(|existing_fields| {
existing_fields.extend(fields.clone())
})
.or_insert(fields.clone());
}
types::ChangeType::Static => {}
}
}
Err(_) => {
debug!("notify: could not find entry with id {}", id)
}
}
}

let (sender, receiver) = mpsc::channel(10);
let subscription = ChangeSubscription {
entries: valid_entries,
sender,
permissions: self.permissions.clone(),
};
if !entries_on_changed.is_empty() {
if frequency.is_some() && entries_continuous.is_empty() {
return Err(SubscriptionError::InvalidInput);
}
let subscription = ChangeSubscription {
entries: entries_on_changed,
sender: sender.clone(),
permissions: self.permissions.clone(),
};

{
// Send everything subscribed to in an initial notification
let db = self.broker.database.read().await;
if subscription.notify(None, &db).await.is_err() {
warn!("Failed to create initial notification");
{
// Send everything subscribed to in an initial notification
let db = self.broker.database.read().await;
if subscription.notify(None, &db).await.is_err() {
warn!("Failed to create initial notification");
}
}

self.broker
.subscriptions
.write()
.await
.add_change_subscription(subscription);
}

self.broker
.subscriptions
.write()
.await
.add_change_subscription(subscription);
if !entries_continuous.is_empty() {
if frequency.is_none() {
return Err(SubscriptionError::InvalidInput);
}
let subscription_continuous = ContinuousSubscription {
entries: entries_continuous,
sender,
permissions: self.permissions.clone(),
database: Arc::clone(&self.broker.database),
};

{
// Send everything subscribed to in an initial notification
//let db = self.broker.database.read().await;
if subscription_continuous.notify(None).await.is_err() {
warn!("Failed to create initial notification");
}
}

self.broker
.subscriptions
.write()
.await
.add_continuous_subscription(subscription_continuous, frequency.unwrap());
}

let stream = ReceiverStream::new(receiver);
Ok(stream)
Expand Down Expand Up @@ -2953,7 +3172,10 @@ mod tests {
.expect("Register datapoint should succeed");

let mut stream = broker
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))]))
.subscribe(
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]),
None,
)
.await
.expect("subscription should succeed");

Expand Down
2 changes: 1 addition & 1 deletion kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

match broker.subscribe(entries).await {
match broker.subscribe(entries, request.frequency_hertz).await {
Ok(stream) => {
let stream = convert_to_proto_stream(stream);
Ok(tonic::Response::new(Box::pin(stream)))
Expand Down
2 changes: 1 addition & 1 deletion kuksa_databroker/databroker/src/viss/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl Viss for Server {
});
};

match broker.subscribe(entries).await {
match broker.subscribe(entries, None).await {
Ok(stream) => {
let subscription_id = SubscriptionId::new();

Expand Down
15 changes: 12 additions & 3 deletions kuksa_databroker/lib/kuksa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,10 @@ impl KuksaClient {
})
}

let req = proto::v1::SubscribeRequest { entries };
let req = proto::v1::SubscribeRequest {
entries,
frequency_hertz: None,
};

match client.subscribe(req).await {
Ok(response) => Ok(response.into_inner()),
Expand Down Expand Up @@ -321,7 +324,10 @@ impl KuksaClient {
})
}

let req = proto::v1::SubscribeRequest { entries };
let req = proto::v1::SubscribeRequest {
entries,
frequency_hertz: None,
};

match client.subscribe(req).await {
Ok(response) => Ok(response.into_inner()),
Expand All @@ -346,7 +352,10 @@ impl KuksaClient {
})
}

let req = proto::v1::SubscribeRequest { entries };
let req = proto::v1::SubscribeRequest {
entries,
frequency_hertz: None,
};

match client.subscribe(req).await {
Ok(response) => Ok(response.into_inner()),
Expand Down
Loading
Loading