Skip to content

Commit

Permalink
send queue: use being_sent as a lock for touching storage
Browse files Browse the repository at this point in the history
There were two disconnected sources of truth for the state of event to
be sent:

- it can or cannot be in the in-memory `being_sent` map
- it can or cannot be in the database

Unfortunately, this led to subtle race conditions when it comes to
editing/aborting. The following sequence of operations was possible:

- try to send an event: a local echo is added to storage, but it's not
marked as being sent yet
- the task wakes up, finds the local echo in the storage,...
- try to edit/abort the event: the event is not marked as being sent
yet, so we think we can edit/abort it
- ... having found the local echo, it is marked as being sent.

This would result in the event misleadlingly not being aborted/edited,
while it should have been.

Now, there's already a lock on the `being_sent` map, so we can hold onto
it while we're touching storage, making sure that there aren't two
callers trying to manipulate storage *and* `being_sent` at the same
time.

This is pretty tricky to test properly, since this requires super
precise timing control over the state store, so there's no test for
this. I can confirm this avoids some weirdness I observed with
`multiverse` though.
  • Loading branch information
bnjbvr committed Jul 1, 2024
1 parent 2f125e9 commit 263c86b
Showing 1 changed file with 20 additions and 9 deletions.
29 changes: 20 additions & 9 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,8 @@ struct QueueStorage {
room_id: OwnedRoomId,

/// All the queued events that are being sent at the moment.
///
/// It also serves as an internal lock on the storage backend.
being_sent: Arc<RwLock<BTreeSet<OwnedTransactionId>>>,
}

Expand Down Expand Up @@ -624,6 +626,9 @@ impl QueueStorage {
/// It is required to call [`Self::mark_as_sent`] after it's been
/// effectively sent.
async fn peek_next_to_send(&self) -> Result<Option<QueuedEvent>, RoomSendQueueStorageError> {
// Keep the lock until we're done touching the storage.
let mut being_sent = self.being_sent.write().await;

let queued_events = self
.client
.get()
Expand All @@ -633,7 +638,7 @@ impl QueueStorage {
.await?;

if let Some(event) = queued_events.iter().find(|queued| !queued.is_wedged) {
self.being_sent.write().await.insert(event.transaction_id.clone());
being_sent.insert(event.transaction_id.clone());

Ok(Some(event.clone()))
} else {
Expand All @@ -655,7 +660,9 @@ impl QueueStorage {
&self,
transaction_id: &TransactionId,
) -> Result<(), RoomSendQueueStorageError> {
self.mark_as_not_being_sent(transaction_id).await;
// Keep the lock until we're done touching the storage.
let mut being_sent = self.being_sent.write().await;
being_sent.remove(transaction_id);

Ok(self
.client
Expand All @@ -672,7 +679,9 @@ impl QueueStorage {
&self,
transaction_id: &TransactionId,
) -> Result<(), RoomSendQueueStorageError> {
self.mark_as_not_being_sent(transaction_id).await;
// Keep the lock until we're done touching the storage.
let mut being_sent = self.being_sent.write().await;
being_sent.remove(transaction_id);

let removed = self
.client
Expand All @@ -699,9 +708,10 @@ impl QueueStorage {
&self,
transaction_id: &TransactionId,
) -> Result<bool, RoomSendQueueStorageError> {
// Note: since there's a single caller (the room sending task, which processes
// events to send linearly), there's no risk for race conditions here.
if self.being_sent.read().await.contains(transaction_id) {
// Keep the lock until we're done touching the storage.
let being_sent = self.being_sent.read().await;

if being_sent.contains(transaction_id) {
return Ok(false);
}

Expand All @@ -728,9 +738,10 @@ impl QueueStorage {
transaction_id: &TransactionId,
serializable: SerializableEventContent,
) -> Result<bool, RoomSendQueueStorageError> {
// Note: since there's a single caller (the room sending task, which processes
// events to send linearly), there's no risk for race conditions here.
if self.being_sent.read().await.contains(transaction_id) {
// Keep the lock until we're done touching the storage.
let being_sent = self.being_sent.read().await;

if being_sent.contains(transaction_id) {
return Ok(false);
}

Expand Down

0 comments on commit 263c86b

Please sign in to comment.