From 4e43dc2c06aa0f1bd7fcfc2fea4c63cf80f6c247 Mon Sep 17 00:00:00 2001 From: Valere Date: Wed, 16 Oct 2024 11:17:59 +0200 Subject: [PATCH] feat(send_queue): Persist failed to send errors Changelog: We now persist the error that caused an event to fail to send. The error `QueueWedgeError` contains info that client can use to try to resolve the problem when the error is not automatically retriable. Some breaking changes in the ffi layer for `timeline::EventSendState`, `SendingFailed` now directly contains the wedge reason enum. Use it in place of the removed variant of EventSendState --- .../src/store/integration_tests.rs | 24 ++- .../matrix-sdk-base/src/store/memory_store.rs | 9 +- crates/matrix-sdk-base/src/store/traits.rs | 24 ++- .../src/state_store/mod.rs | 33 +++- .../007_a_send_queue_wedge_reason.sql | 4 + .../state_store/007_b_send_queue_clean.sql | 2 + crates/matrix-sdk-sqlite/src/state_store.rs | 141 ++++++++++++++++-- .../src/timeline/controller/mod.rs | 17 +-- .../tests/integration/timeline/queue.rs | 22 ++- crates/matrix-sdk/src/send_queue.rs | 22 +-- .../tests/integration/send_queue.rs | 4 +- 11 files changed, 235 insertions(+), 67 deletions(-) create mode 100644 crates/matrix-sdk-sqlite/migrations/state_store/007_a_send_queue_wedge_reason.sql create mode 100644 crates/matrix-sdk-sqlite/migrations/state_store/007_b_send_queue_clean.sql diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index 7fd99b32a28..cbd08818a47 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -6,6 +6,7 @@ use assert_matches::assert_matches; use assert_matches2::assert_let; use async_trait::async_trait; use growable_bloom_filter::GrowableBloomBuilder; +use matrix_sdk_common::deserialized_responses::QueueWedgeError; use matrix_sdk_test::test_json; use ruma::{ api::MatrixVersion, @@ -1223,7 +1224,7 @@ impl StateStoreIntegrationTests for DynStateStore { assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized); assert_eq!(content.body(), "msg0"); - assert!(!pending[0].is_wedged); + assert!(!pending[0].is_wedged()); } // Saving another three things should work. @@ -1249,12 +1250,18 @@ impl StateStoreIntegrationTests for DynStateStore { let deserialized = pending[i].event.deserialize().unwrap(); assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized); assert_eq!(content.body(), format!("msg{i}")); - assert!(!pending[i].is_wedged); + assert!(!pending[i].is_wedged()); } // Marking an event as wedged works. let txn2 = &pending[2].transaction_id; - self.update_send_queue_event_status(room_id, txn2, true).await.unwrap(); + self.update_send_queue_event_status( + room_id, + txn2, + Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }), + ) + .await + .unwrap(); // And it is reflected. let pending = self.load_send_queue_events(room_id).await.unwrap(); @@ -1263,10 +1270,13 @@ impl StateStoreIntegrationTests for DynStateStore { assert_eq!(pending.len(), 4); assert_eq!(pending[0].transaction_id, txn0); assert_eq!(pending[2].transaction_id, *txn2); - assert!(pending[2].is_wedged); + assert!(pending[2].is_wedged()); + let error = pending[2].clone().error.unwrap(); + let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg); + assert_eq!(generic_error, "Oops"); for i in 0..4 { if i != 2 { - assert!(!pending[i].is_wedged); + assert!(!pending[i].is_wedged()); } } @@ -1288,7 +1298,7 @@ impl StateStoreIntegrationTests for DynStateStore { assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized); assert_eq!(content.body(), "wow that's a cool test"); - assert!(!pending[2].is_wedged); + assert!(!pending[2].is_wedged()); for i in 0..4 { if i != 2 { @@ -1296,7 +1306,7 @@ impl StateStoreIntegrationTests for DynStateStore { assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized); assert_eq!(content.body(), format!("msg{i}")); - assert!(!pending[i].is_wedged); + assert!(!pending[i].is_wedged()); } } } diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index 995c7113870..c623a9ff67c 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -19,6 +19,7 @@ use std::{ use async_trait::async_trait; use growable_bloom_filter::GrowableBloom; +use matrix_sdk_common::deserialized_responses::QueueWedgeError; use ruma::{ canonical_json::{redact, RedactedBecause}, events::{ @@ -815,7 +816,7 @@ impl StateStore for MemoryStore { .unwrap() .entry(room_id.to_owned()) .or_default() - .push(QueuedEvent { event, transaction_id, is_wedged: false }); + .push(QueuedEvent { event, transaction_id, error: None }); Ok(()) } @@ -835,7 +836,7 @@ impl StateStore for MemoryStore { .find(|item| item.transaction_id == transaction_id) { entry.event = content; - entry.is_wedged = false; + entry.error = None; Ok(true) } else { Ok(false) @@ -876,7 +877,7 @@ impl StateStore for MemoryStore { &self, room_id: &RoomId, transaction_id: &TransactionId, - wedged: bool, + error: Option, ) -> Result<(), Self::Error> { if let Some(entry) = self .send_queue_events @@ -887,7 +888,7 @@ impl StateStore for MemoryStore { .iter_mut() .find(|item| item.transaction_id == transaction_id) { - entry.is_wedged = wedged; + entry.error = error; } Ok(()) } diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index d96e3f24cb0..65fee5b7bb1 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -23,7 +23,7 @@ use std::{ use as_variant::as_variant; use async_trait::async_trait; use growable_bloom_filter::GrowableBloom; -use matrix_sdk_common::AsyncTraitDeps; +use matrix_sdk_common::{deserialized_responses::QueueWedgeError, AsyncTraitDeps}; use ruma::{ api::MatrixVersion, events::{ @@ -392,12 +392,13 @@ pub trait StateStore: AsyncTraitDeps { room_id: &RoomId, ) -> Result, Self::Error>; - /// Updates the send queue wedged status for a given send queue event. + /// Updates the send queue error status (wedge) for a given send queue + /// event. async fn update_send_queue_event_status( &self, room_id: &RoomId, transaction_id: &TransactionId, - wedged: bool, + error: Option, ) -> Result<(), Self::Error>; /// Loads all the rooms which have any pending events in their send queue. @@ -662,10 +663,10 @@ impl StateStore for EraseStateStoreError { &self, room_id: &RoomId, transaction_id: &TransactionId, - wedged: bool, + error: Option, ) -> Result<(), Self::Error> { self.0 - .update_send_queue_event_status(room_id, transaction_id, wedged) + .update_send_queue_event_status(room_id, transaction_id, error) .await .map_err(Into::into) } @@ -1156,7 +1157,16 @@ pub struct QueuedEvent { /// If the event couldn't be sent because of an API error, it's marked as /// wedged, and won't ever be peeked for sending. The only option is to /// remove it. - pub is_wedged: bool, + pub error: Option, +} + +impl QueuedEvent { + /// If the event couldn't be sent because of an API error, it's marked as + /// wedged, and won't ever be peeked for sending. The only option is to + /// remove it. + pub fn is_wedged(&self) -> bool { + self.error.is_some() + } } /// The specific user intent that characterizes a [`DependentQueuedEvent`]. @@ -1254,7 +1264,7 @@ impl fmt::Debug for QueuedEvent { // Hide the content from the debug log. f.debug_struct("QueuedEvent") .field("transaction_id", &self.transaction_id) - .field("is_wedged", &self.is_wedged) + .field("is_wedged", &self.is_wedged()) .finish_non_exhaustive() } } diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index 9f4c705aec9..91b05c987fd 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -147,6 +147,7 @@ mod keys { } pub use keys::ALL_STORES; +use matrix_sdk_base::deserialized_responses::QueueWedgeError; /// Encrypt (if needs be) then JSON-serialize a value. fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result { @@ -432,7 +433,12 @@ struct PersistedQueuedEvent { // All these fields are the same as in [`QueuedEvent`]. event: SerializableEventContent, transaction_id: OwnedTransactionId, - is_wedged: bool, + + // Deprecated (from old format), now replaced with error field. + // Kept here for migration + is_wedged: Option, + + pub error: Option, } // Small hack to have the following macro invocation act as the appropriate @@ -1325,7 +1331,8 @@ impl_state_store!({ room_id: room_id.to_owned(), event: content, transaction_id, - is_wedged: false, + is_wedged: None, + error: None, }); // Save the new vector into db. @@ -1363,7 +1370,8 @@ impl_state_store!({ // Modify the one event. if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) { entry.event = content; - entry.is_wedged = false; + entry.is_wedged = None; + entry.error = None; // Save the new vector into db. obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?; @@ -1432,7 +1440,19 @@ impl_state_store!({ .map(|item| QueuedEvent { event: item.event, transaction_id: item.transaction_id, - is_wedged: item.is_wedged, + error: match item.is_wedged { + Some(legacy_wedged) => { + if legacy_wedged { + // migrate a generic error + Some(QueueWedgeError::GenericApiError { + msg: "local echo failed to send in a previous session".into(), + }) + } else { + None + } + } + None => item.error, + }, }) .collect()) } @@ -1441,7 +1461,7 @@ impl_state_store!({ &self, room_id: &RoomId, transaction_id: &TransactionId, - wedged: bool, + error: Option, ) -> Result<()> { let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id); @@ -1456,7 +1476,8 @@ impl_state_store!({ if let Some(queued_event) = prev.iter_mut().find(|item| item.transaction_id == transaction_id) { - queued_event.is_wedged = wedged; + queued_event.is_wedged = None; + queued_event.error = error; obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?; } } diff --git a/crates/matrix-sdk-sqlite/migrations/state_store/007_a_send_queue_wedge_reason.sql b/crates/matrix-sdk-sqlite/migrations/state_store/007_a_send_queue_wedge_reason.sql new file mode 100644 index 00000000000..31a8d2012ab --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/state_store/007_a_send_queue_wedge_reason.sql @@ -0,0 +1,4 @@ +-- New send queue events, now persists the type of error causing it to be wedged +ALTER TABLE "send_queue_events" + -- Used as a value, thus encrypted/decrypted + ADD COLUMN "wedge_reason" BLOB; diff --git a/crates/matrix-sdk-sqlite/migrations/state_store/007_b_send_queue_clean.sql b/crates/matrix-sdk-sqlite/migrations/state_store/007_b_send_queue_clean.sql new file mode 100644 index 00000000000..293935f4efe --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/state_store/007_b_send_queue_clean.sql @@ -0,0 +1,2 @@ +ALTER TABLE "send_queue_events" + DROP COLUMN "wedged"; diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index 21682db911b..3b91ce4d470 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -9,7 +9,7 @@ use std::{ use async_trait::async_trait; use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; use matrix_sdk_base::{ - deserialized_responses::{RawAnySyncOrStrippedState, SyncOrStrippedState}, + deserialized_responses::{QueueWedgeError, RawAnySyncOrStrippedState, SyncOrStrippedState}, store::{ migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedEvent, DependentQueuedEventKind, QueuedEvent, SerializableEventContent, @@ -68,7 +68,7 @@ mod keys { /// This is used to figure whether the sqlite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`SqliteStateStore::run_migrations`] function.. -const DATABASE_VERSION: u8 = 7; +const DATABASE_VERSION: u8 = 8; /// A sqlite based cryptostore. #[derive(Clone)] @@ -261,6 +261,41 @@ impl SqliteStateStore { .await?; } + if from < 8 && to >= 8 { + // Replace all existing wedged events with a generic error. + let error = QueueWedgeError::GenericApiError { + msg: "local echo failed to send in a previous session".into(), + }; + let default_err = self.serialize_value(&error)?; + + conn.with_transaction(move |txn| { + // Update send queue table to persist the wedge reason if any. + txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?; + + // Migrate the data, add a generic error for currently wedged events + + for wedged_entries in txn + .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")? + .query_map((), |row| { + Ok( + (row.get::<_, Vec>(0)?,row.get::<_, String>(1)?) + ) + })? { + + let (room_id, transaction_id) = wedged_entries?; + + txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")? + .execute((default_err.clone(), room_id, transaction_id))?; + } + + + // Clean up the table now that data is migrated + txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?; + + txn.set_db_version(8) + }) + .await?; + } Ok(()) } @@ -1653,7 +1688,7 @@ impl StateStore for SqliteStateStore { self.acquire() .await? .with_transaction(move |txn| { - txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, false)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content))?; + txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content) VALUES (?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content))?; Ok(()) }) .await @@ -1675,7 +1710,7 @@ impl StateStore for SqliteStateStore { let num_updated = self.acquire() .await? .with_transaction(move |txn| { - txn.prepare_cached("UPDATE send_queue_events SET wedged = false, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id)) + txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id)) }) .await?; @@ -1715,11 +1750,11 @@ impl StateStore for SqliteStateStore { // Note: ROWID is always present and is an auto-incremented integer counter. We // want to maintain the insertion order, so we can sort using it. // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`. - let res: Vec<(String, Vec, bool)> = self + let res: Vec<(String, Vec, Option>)> = self .acquire() .await? .prepare( - "SELECT transaction_id, content, wedged FROM send_queue_events WHERE room_id = ? ORDER BY ROWID", + "SELECT transaction_id, content, wedge_reason FROM send_queue_events WHERE room_id = ? ORDER BY ROWID", |mut stmt| { stmt.query((room_id,))? .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?))) @@ -1733,7 +1768,7 @@ impl StateStore for SqliteStateStore { queued_events.push(QueuedEvent { transaction_id: entry.0.into(), event: self.deserialize_json(&entry.1)?, - is_wedged: entry.2, + error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?, }); } @@ -1744,17 +1779,17 @@ impl StateStore for SqliteStateStore { &self, room_id: &RoomId, transaction_id: &TransactionId, - wedged: bool, + error: Option, ) -> Result<(), Self::Error> { let room_id = self.encode_key(keys::SEND_QUEUE, room_id); // See comment in `save_send_queue_event`. let transaction_id = transaction_id.to_string(); - + let error_value = error.map(|e| self.serialize_value(&e)).transpose()?; self.acquire() .await? .with_transaction(move |txn| { - txn.prepare_cached("UPDATE send_queue_events SET wedged = ? WHERE room_id = ? AND transaction_id = ?")?.execute((wedged, room_id, transaction_id))?; + txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?; Ok(()) }) .await @@ -1961,12 +1996,20 @@ mod migration_tests { }, }; - use matrix_sdk_base::{sync::UnreadNotificationsCount, RoomState, StateStore}; + use assert_matches::assert_matches; + use matrix_sdk_base::{ + deserialized_responses::QueueWedgeError, store::SerializableEventContent, + sync::UnreadNotificationsCount, RoomState, StateStore, + }; use matrix_sdk_test::async_test; use once_cell::sync::Lazy; use ruma::{ - events::{room::create::RoomCreateEventContent, StateEventType}, - room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, RoomId, UserId, + events::{ + room::{create::RoomCreateEventContent, message::RoomMessageEventContent}, + StateEventType, + }, + room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, RoomId, TransactionId, + UserId, }; use rusqlite::Transaction; use serde_json::json; @@ -2213,4 +2256,76 @@ mod migration_tests { assert_eq!(room_c.name(), None); assert_eq!(room_c.creator(), Some(room_c_create_sender)); } + + #[async_test] + pub async fn test_migrating_v7_to_v8() { + let path = new_path(); + + let room_a_id = room_id!("!room_a:dummy.local"); + let wedged_event_transaction_id = TransactionId::new(); + let local_event_transaction_id = TransactionId::new(); + + // Create and populate db. + { + let db = create_fake_db(&path, 7).await.unwrap(); + let conn = db.pool.get().await.unwrap(); + + let this = db.clone(); + let wedge_tx = wedged_event_transaction_id.clone(); + let local_tx = local_event_transaction_id.clone(); + + conn.with_transaction(move |txn| { + add_send_queue_event_v7(&this, txn, &wedge_tx, room_a_id, true)?; + add_send_queue_event_v7(&this, txn, &local_tx, room_a_id, false)?; + + Result::<_, Error>::Ok(()) + }) + .await + .unwrap(); + } + + // This transparently migrates to the latest version. + let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap(); + let queued_event = store.load_send_queue_events(room_a_id).await.unwrap(); + + assert_eq!(queued_event.len(), 2); + + let migrated_wedged = + queued_event.iter().find(|e| e.transaction_id == wedged_event_transaction_id).unwrap(); + + assert!(migrated_wedged.is_wedged()); + assert_matches!( + migrated_wedged.error.clone(), + Some(QueueWedgeError::GenericApiError { .. }) + ); + + let migrated_ok = queued_event + .iter() + .find(|e| e.transaction_id == local_event_transaction_id.clone()) + .unwrap(); + + assert!(!migrated_ok.is_wedged()); + assert!(migrated_ok.error.is_none()); + } + + fn add_send_queue_event_v7( + this: &SqliteStateStore, + txn: &Transaction<'_>, + transaction_id: &TransactionId, + room_id: &RoomId, + is_wedged: bool, + ) -> Result<(), Error> { + let content = + SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?; + + let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id); + let room_id_value = this.serialize_value(&room_id.to_owned())?; + + let content = this.serialize_json(&content)?; + + txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")? + .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?; + + Ok(()) + } } diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index 2478a4c86cd..ce8f7773d61 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -29,7 +29,6 @@ use matrix_sdk::{ }, Result, Room, }; -use matrix_sdk_base::deserialized_responses::QueueWedgeError; use ruma::{ api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType, events::{ @@ -1241,7 +1240,7 @@ impl TimelineController

{ /// Handle a room send update that's a new local echo. pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) { match echo.content { - LocalEchoContent::Event { serialized_event, send_handle, is_wedged } => { + LocalEchoContent::Event { serialized_event, send_handle, send_error } => { let content = match serialized_event.deserialize() { Ok(d) => d, Err(err) => { @@ -1257,17 +1256,10 @@ impl TimelineController

{ ) .await; - if is_wedged { + if let Some(send_error) = send_error { self.update_event_send_state( &echo.transaction_id, - EventSendState::SendingFailed { - // Put a dummy error in this case, since we're not persisting the errors - // that occurred in previous sessions. - error: QueueWedgeError::GenericApiError { - msg: MISSING_LOCAL_ECHO_FAIL_ERROR.into(), - }, - is_recoverable: false, - }, + EventSendState::SendingFailed { error: send_error, is_recoverable: false }, ) .await; } @@ -1522,9 +1514,6 @@ impl TimelineController { } } -const MISSING_LOCAL_ECHO_FAIL_ERROR: &'static str = - "local echo failed to send in a previous session"; - #[derive(Debug, Default)] pub(super) struct HandleManyEventsResult { /// The number of items that were added to the timeline. diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index ba29da35bb5..add1c799857 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -19,6 +19,7 @@ use assert_matches2::assert_let; use eyeball_im::VectorDiff; use futures_util::StreamExt; use matrix_sdk::{config::SyncSettings, test_utils::logged_in_client_with_server}; +use matrix_sdk_base::deserialized_responses::QueueWedgeError; use matrix_sdk_test::{ async_test, mocks::mock_encryption_state, EventBuilder, JoinedRoomBuilder, SyncResponseBuilder, ALICE, @@ -277,9 +278,13 @@ async fn test_reloaded_failed_local_echoes_are_marked_as_failed() { // The error is not recoverable. assert!(!is_recoverable); // And it's properly pattern-matched. - assert_matches!( - error.as_client_api_error().unwrap().error_kind(), - Some(ruma::api::client::error::ErrorKind::TooLarge) + let msg = assert_matches!( + error, + QueueWedgeError::GenericApiError { msg } => msg + ); + assert_eq!( + msg, + "the server returned an error: [413 / M_TOO_LARGE] Sounds like you have a lot to say!" ); assert_pending!(timeline_stream); @@ -296,8 +301,15 @@ async fn test_reloaded_failed_local_echoes_are_marked_as_failed() { // Same recoverable status as above. assert!(!is_recoverable); - // But the error details have been lost. - assert!(error.as_client_api_error().is_none()); + // It was persisted and it's properly pattern-matched. + let msg = assert_matches!( + error, + QueueWedgeError::GenericApiError { msg } => msg + ); + assert_eq!( + msg, + "the server returned an error: [413 / M_TOO_LARGE] Sounds like you have a lot to say!" + ); } #[async_test] diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 2d2488022e3..67e729f7d0f 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -68,7 +68,7 @@ use ruma::{ EventContent as _, }, serde::Raw, - EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, OwnedUserId, TransactionId, + EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, }; use tokio::sync::{broadcast, Notify, RwLock}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -355,7 +355,7 @@ impl RoomSendQueue { room: self.clone(), transaction_id: transaction_id.clone(), }, - is_wedged: false, + send_error: None, }, })); @@ -522,7 +522,10 @@ impl RoomSendQueue { warn!(txn_id = %queued_event.transaction_id, error = ?err, "Unrecoverable error when sending event: {err}"); // Mark the event as wedged, so it's not picked at any future point. - if let Err(err) = queue.mark_as_wedged(&queued_event.transaction_id).await { + if let Err(err) = queue + .mark_as_wedged(&queued_event.transaction_id, wedged_err.clone()) + .await + { warn!("unable to mark event as wedged: {err}"); } } @@ -694,7 +697,7 @@ impl QueueStorage { let queued_events = self.client()?.store().load_send_queue_events(&self.room_id).await?; - if let Some(event) = queued_events.iter().find(|queued| !queued.is_wedged) { + if let Some(event) = queued_events.iter().find(|queued| !queued.is_wedged()) { being_sent.insert(event.transaction_id.clone()); Ok(Some(event.clone())) @@ -716,6 +719,7 @@ impl QueueStorage { async fn mark_as_wedged( &self, transaction_id: &TransactionId, + reason: QueueWedgeError, ) -> Result<(), RoomSendQueueStorageError> { // Keep the lock until we're done touching the storage. let mut being_sent = self.being_sent.write().await; @@ -724,7 +728,7 @@ impl QueueStorage { Ok(self .client()? .store() - .update_send_queue_event_status(&self.room_id, transaction_id, true) + .update_send_queue_event_status(&self.room_id, transaction_id, Some(reason)) .await?) } @@ -737,7 +741,7 @@ impl QueueStorage { Ok(self .client()? .store() - .update_send_queue_event_status(&self.room_id, transaction_id, false) + .update_send_queue_event_status(&self.room_id, transaction_id, None) .await?) } @@ -891,7 +895,7 @@ impl QueueStorage { room: room.clone(), transaction_id: queued.transaction_id, }, - is_wedged: queued.is_wedged, + send_error: queued.error, }, } }); @@ -1171,8 +1175,8 @@ pub enum LocalEchoContent { /// A handle to manipulate the sending of the associated event. send_handle: SendHandle, /// Whether trying to send this local echo failed in the past with an - /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]). - is_wedged: bool, + /// error (see [`SendQueueRoomError::is_recoverable`]). + send_error: Option, }, /// A local echo has been reacted to. diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index 36e86e0d7d4..bf32f3e3bde 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -78,7 +78,7 @@ macro_rules! assert_update { serialized_event, send_handle, // New local echoes should always start as not wedged. - is_wedged: false, + send_error: None, }, transaction_id: txn, }))) = timeout(Duration::from_secs(1), $watch.recv()).await @@ -1056,7 +1056,7 @@ async fn test_edit_with_poll_start() { content: LocalEchoContent::Event { serialized_event, // New local echoes should always start as not wedged. - is_wedged: false, + send_error: None, .. }, transaction_id: txn1,