From 263c86b508c616b8434cf9eed6403951a9a79091 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Mon, 1 Jul 2024 15:38:31 +0200 Subject: [PATCH] send queue: use `being_sent` as a lock for touching storage There were two disconnected sources of truth for the state of event to be sent: - it can or cannot be in the in-memory `being_sent` map - it can or cannot be in the database Unfortunately, this led to subtle race conditions when it comes to editing/aborting. The following sequence of operations was possible: - try to send an event: a local echo is added to storage, but it's not marked as being sent yet - the task wakes up, finds the local echo in the storage,... - try to edit/abort the event: the event is not marked as being sent yet, so we think we can edit/abort it - ... having found the local echo, it is marked as being sent. This would result in the event misleadlingly not being aborted/edited, while it should have been. Now, there's already a lock on the `being_sent` map, so we can hold onto it while we're touching storage, making sure that there aren't two callers trying to manipulate storage *and* `being_sent` at the same time. This is pretty tricky to test properly, since this requires super precise timing control over the state store, so there's no test for this. I can confirm this avoids some weirdness I observed with `multiverse` though. --- crates/matrix-sdk/src/send_queue.rs | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 43c43cdd303..e4c24344ac5 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -591,6 +591,8 @@ struct QueueStorage { room_id: OwnedRoomId, /// All the queued events that are being sent at the moment. + /// + /// It also serves as an internal lock on the storage backend. being_sent: Arc>>, } @@ -624,6 +626,9 @@ impl QueueStorage { /// It is required to call [`Self::mark_as_sent`] after it's been /// effectively sent. async fn peek_next_to_send(&self) -> Result, RoomSendQueueStorageError> { + // Keep the lock until we're done touching the storage. + let mut being_sent = self.being_sent.write().await; + let queued_events = self .client .get() @@ -633,7 +638,7 @@ impl QueueStorage { .await?; if let Some(event) = queued_events.iter().find(|queued| !queued.is_wedged) { - self.being_sent.write().await.insert(event.transaction_id.clone()); + being_sent.insert(event.transaction_id.clone()); Ok(Some(event.clone())) } else { @@ -655,7 +660,9 @@ impl QueueStorage { &self, transaction_id: &TransactionId, ) -> Result<(), RoomSendQueueStorageError> { - self.mark_as_not_being_sent(transaction_id).await; + // Keep the lock until we're done touching the storage. + let mut being_sent = self.being_sent.write().await; + being_sent.remove(transaction_id); Ok(self .client @@ -672,7 +679,9 @@ impl QueueStorage { &self, transaction_id: &TransactionId, ) -> Result<(), RoomSendQueueStorageError> { - self.mark_as_not_being_sent(transaction_id).await; + // Keep the lock until we're done touching the storage. + let mut being_sent = self.being_sent.write().await; + being_sent.remove(transaction_id); let removed = self .client @@ -699,9 +708,10 @@ impl QueueStorage { &self, transaction_id: &TransactionId, ) -> Result { - // Note: since there's a single caller (the room sending task, which processes - // events to send linearly), there's no risk for race conditions here. - if self.being_sent.read().await.contains(transaction_id) { + // Keep the lock until we're done touching the storage. + let being_sent = self.being_sent.read().await; + + if being_sent.contains(transaction_id) { return Ok(false); } @@ -728,9 +738,10 @@ impl QueueStorage { transaction_id: &TransactionId, serializable: SerializableEventContent, ) -> Result { - // Note: since there's a single caller (the room sending task, which processes - // events to send linearly), there's no risk for race conditions here. - if self.being_sent.read().await.contains(transaction_id) { + // Keep the lock until we're done touching the storage. + let being_sent = self.being_sent.read().await; + + if being_sent.contains(transaction_id) { return Ok(false); }