Skip to content

Commit

Permalink
send queue: control enabled on a per-room basis in addition to globally
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Jun 10, 2024
1 parent 9c1d62a commit e5487da
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 119 deletions.
49 changes: 23 additions & 26 deletions bindings/matrix-sdk-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,12 @@ pub trait ProgressWatcher: Send + Sync {
fn transmission_progress(&self, progress: TransmissionProgress);
}

/// A listener to the global (client-wide) status of the send queue.
/// A listener to the global (client-wide) error reporter of the send queue.
#[uniffi::export(callback_interface)]
pub trait SendQueueStatusListener: Sync + Send {
/// Called every time the send queue has received a new status.
///
/// This can be set automatically (in case of sending failure), or manually
/// via an API call.
fn on_update(&self, new_value: bool);
pub trait SendQueueRoomErrorListener: Sync + Send {
/// Called every time the send queue has ran into an error for a given room,
/// which will disable the send queue for that particular room.
fn on_error(&self, room_id: String, error: ClientError);
}

#[derive(Clone, Copy, uniffi::Record)]
Expand Down Expand Up @@ -315,18 +313,15 @@ impl Client {
Ok(())
}

/// Enables or disables the send queue, according to the given parameter.
/// Enables or disables all the room send queues at once.
///
/// The send queue automatically disables itself whenever sending an
/// event with it failed (e.g., sending an event via the high-level Timeline
/// object), so it's required to manually re-enable it as soon as
/// connectivity is back on the device.
pub fn enable_send_queue(&self, enable: bool) {
if enable {
self.inner.send_queue().enable();
} else {
self.inner.send_queue().disable();
}
/// When connectivity is lost on a device, it is recommended to disable the
/// room sending queues.
///
/// This can be controlled for individual rooms, using
/// [`Room::enable_send_queue`].
pub fn enable_all_send_queues(&self, enable: bool) {
self.inner.send_queue().set_enabled(enable);
}

/// Subscribe to the global enablement status of the send queue, at the
Expand All @@ -336,17 +331,19 @@ impl Client {
/// the enablement status.
pub fn subscribe_to_send_queue_status(
&self,
listener: Box<dyn SendQueueStatusListener>,
listener: Box<dyn SendQueueRoomErrorListener>,
) -> Arc<TaskHandle> {
let mut subscriber = self.inner.send_queue().subscribe_status();
let mut subscriber = self.inner.send_queue().subscribe_errors();

Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
// Call with the initial value.
listener.on_update(subscriber.next_now());

// Call every time the value changes.
while let Some(next_val) = subscriber.next().await {
listener.on_update(next_val);
loop {
match subscriber.recv().await {
Ok(report) => listener
.on_error(report.room_id.to_string(), ClientError::new(report.error)),
Err(err) => {
error!("error when listening to the send queue error reporter: {err}");
}
}
}
})))
}
Expand Down
2 changes: 1 addition & 1 deletion bindings/matrix-sdk-ffi/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub enum ClientError {
}

impl ClientError {
fn new<E: Display>(error: E) -> Self {
pub(crate) fn new<E: Display>(error: E) -> Self {
Self::Generic { msg: error.to_string() }
}
}
Expand Down
11 changes: 11 additions & 0 deletions bindings/matrix-sdk-ffi/src/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,17 @@ impl Room {
.await?;
Ok(())
}

/// Returns whether the send queue for that particular room is enabled or
/// not.
pub fn is_send_queue_enabled(&self) -> bool {
self.inner.send_queue().is_enabled()
}

/// Enable or disable the send queue for that particular room.
pub fn enable_send_queue(&self, enable: bool) {
self.inner.send_queue().set_enabled(enable);
}
}

/// Generates a `matrix.to` permalink to the given room alias.
Expand Down
9 changes: 6 additions & 3 deletions crates/matrix-sdk-ui/tests/integration/timeline/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async fn test_retry_failed() {

mock_encryption_state(&server, false).await;

client.send_queue().enable();
client.send_queue().set_enabled(true);

let room = client.get_room(room_id).unwrap();
let timeline = Arc::new(room.timeline().await.unwrap());
Expand Down Expand Up @@ -173,9 +173,12 @@ async fn test_retry_failed() {
.mount(&server)
.await;

assert!(!client.send_queue().is_enabled());
// This doesn't disable the send queue at the global level…
assert!(client.send_queue().is_enabled());
// …but does so at the local level.
assert!(!room.send_queue().is_enabled());

client.send_queue().enable();
room.send_queue().set_enabled(true);

// Let the send queue handle the event.
tokio::time::sleep(Duration::from_millis(300)).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-ui/tests/integration/timeline/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ async fn test_retry_order() {
.await;

// Retry the second message first
client.send_queue().enable();
client.send_queue().set_enabled(true);

// Wait 200ms for the first msg, 100ms for the second, 300ms for overhead
sleep(Duration::from_millis(600)).await;
Expand Down
Loading

0 comments on commit e5487da

Please sign in to comment.