Skip to content

Commit

Permalink
feat(send_queue): Persist failed to send errors
Browse files Browse the repository at this point in the history
Changelog:  We now persist the error that caused an event to fail to send. The error `QueueWedgeError` contains info that client can use to try to resolve the problem when the error is not automatically retriable.

Some breaking changes in the ffi layer for `timeline::EventSendState`, `SendingFailed` now directly contains the wedge reason enum. Use it in place of the removed variant of EventSendState
  • Loading branch information
BillCarsonFr committed Oct 16, 2024
1 parent e769600 commit 4e43dc2
Show file tree
Hide file tree
Showing 11 changed files with 235 additions and 67 deletions.
24 changes: 17 additions & 7 deletions crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use assert_matches::assert_matches;
use assert_matches2::assert_let;
use async_trait::async_trait;
use growable_bloom_filter::GrowableBloomBuilder;
use matrix_sdk_common::deserialized_responses::QueueWedgeError;
use matrix_sdk_test::test_json;
use ruma::{
api::MatrixVersion,
Expand Down Expand Up @@ -1223,7 +1224,7 @@ impl StateStoreIntegrationTests for DynStateStore {
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), "msg0");

assert!(!pending[0].is_wedged);
assert!(!pending[0].is_wedged());
}

// Saving another three things should work.
Expand All @@ -1249,12 +1250,18 @@ impl StateStoreIntegrationTests for DynStateStore {
let deserialized = pending[i].event.deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), format!("msg{i}"));
assert!(!pending[i].is_wedged);
assert!(!pending[i].is_wedged());
}

// Marking an event as wedged works.
let txn2 = &pending[2].transaction_id;
self.update_send_queue_event_status(room_id, txn2, true).await.unwrap();
self.update_send_queue_event_status(
room_id,
txn2,
Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
)
.await
.unwrap();

// And it is reflected.
let pending = self.load_send_queue_events(room_id).await.unwrap();
Expand All @@ -1263,10 +1270,13 @@ impl StateStoreIntegrationTests for DynStateStore {
assert_eq!(pending.len(), 4);
assert_eq!(pending[0].transaction_id, txn0);
assert_eq!(pending[2].transaction_id, *txn2);
assert!(pending[2].is_wedged);
assert!(pending[2].is_wedged());
let error = pending[2].clone().error.unwrap();
let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
assert_eq!(generic_error, "Oops");
for i in 0..4 {
if i != 2 {
assert!(!pending[i].is_wedged);
assert!(!pending[i].is_wedged());
}
}

Expand All @@ -1288,15 +1298,15 @@ impl StateStoreIntegrationTests for DynStateStore {
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), "wow that's a cool test");

assert!(!pending[2].is_wedged);
assert!(!pending[2].is_wedged());

for i in 0..4 {
if i != 2 {
let deserialized = pending[i].event.deserialize().unwrap();
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
assert_eq!(content.body(), format!("msg{i}"));

assert!(!pending[i].is_wedged);
assert!(!pending[i].is_wedged());
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{

use async_trait::async_trait;
use growable_bloom_filter::GrowableBloom;
use matrix_sdk_common::deserialized_responses::QueueWedgeError;
use ruma::{
canonical_json::{redact, RedactedBecause},
events::{
Expand Down Expand Up @@ -815,7 +816,7 @@ impl StateStore for MemoryStore {
.unwrap()
.entry(room_id.to_owned())
.or_default()
.push(QueuedEvent { event, transaction_id, is_wedged: false });
.push(QueuedEvent { event, transaction_id, error: None });
Ok(())
}

Expand All @@ -835,7 +836,7 @@ impl StateStore for MemoryStore {
.find(|item| item.transaction_id == transaction_id)
{
entry.event = content;
entry.is_wedged = false;
entry.error = None;
Ok(true)
} else {
Ok(false)
Expand Down Expand Up @@ -876,7 +877,7 @@ impl StateStore for MemoryStore {
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
wedged: bool,
error: Option<QueueWedgeError>,
) -> Result<(), Self::Error> {
if let Some(entry) = self
.send_queue_events
Expand All @@ -887,7 +888,7 @@ impl StateStore for MemoryStore {
.iter_mut()
.find(|item| item.transaction_id == transaction_id)
{
entry.is_wedged = wedged;
entry.error = error;
}
Ok(())
}
Expand Down
24 changes: 17 additions & 7 deletions crates/matrix-sdk-base/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
use as_variant::as_variant;
use async_trait::async_trait;
use growable_bloom_filter::GrowableBloom;
use matrix_sdk_common::AsyncTraitDeps;
use matrix_sdk_common::{deserialized_responses::QueueWedgeError, AsyncTraitDeps};
use ruma::{
api::MatrixVersion,
events::{
Expand Down Expand Up @@ -392,12 +392,13 @@ pub trait StateStore: AsyncTraitDeps {
room_id: &RoomId,
) -> Result<Vec<QueuedEvent>, Self::Error>;

/// Updates the send queue wedged status for a given send queue event.
/// Updates the send queue error status (wedge) for a given send queue
/// event.
async fn update_send_queue_event_status(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
wedged: bool,
error: Option<QueueWedgeError>,
) -> Result<(), Self::Error>;

/// Loads all the rooms which have any pending events in their send queue.
Expand Down Expand Up @@ -662,10 +663,10 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
wedged: bool,
error: Option<QueueWedgeError>,
) -> Result<(), Self::Error> {
self.0
.update_send_queue_event_status(room_id, transaction_id, wedged)
.update_send_queue_event_status(room_id, transaction_id, error)
.await
.map_err(Into::into)
}
Expand Down Expand Up @@ -1156,7 +1157,16 @@ pub struct QueuedEvent {
/// If the event couldn't be sent because of an API error, it's marked as
/// wedged, and won't ever be peeked for sending. The only option is to
/// remove it.
pub is_wedged: bool,
pub error: Option<QueueWedgeError>,
}

impl QueuedEvent {
/// If the event couldn't be sent because of an API error, it's marked as
/// wedged, and won't ever be peeked for sending. The only option is to
/// remove it.
pub fn is_wedged(&self) -> bool {
self.error.is_some()
}
}

/// The specific user intent that characterizes a [`DependentQueuedEvent`].
Expand Down Expand Up @@ -1254,7 +1264,7 @@ impl fmt::Debug for QueuedEvent {
// Hide the content from the debug log.
f.debug_struct("QueuedEvent")
.field("transaction_id", &self.transaction_id)
.field("is_wedged", &self.is_wedged)
.field("is_wedged", &self.is_wedged())
.finish_non_exhaustive()
}
}
Expand Down
33 changes: 27 additions & 6 deletions crates/matrix-sdk-indexeddb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ mod keys {
}

pub use keys::ALL_STORES;
use matrix_sdk_base::deserialized_responses::QueueWedgeError;

/// Encrypt (if needs be) then JSON-serialize a value.
fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
Expand Down Expand Up @@ -432,7 +433,12 @@ struct PersistedQueuedEvent {
// All these fields are the same as in [`QueuedEvent`].
event: SerializableEventContent,
transaction_id: OwnedTransactionId,
is_wedged: bool,

// Deprecated (from old format), now replaced with error field.
// Kept here for migration
is_wedged: Option<bool>,

pub error: Option<QueueWedgeError>,
}

// Small hack to have the following macro invocation act as the appropriate
Expand Down Expand Up @@ -1325,7 +1331,8 @@ impl_state_store!({
room_id: room_id.to_owned(),
event: content,
transaction_id,
is_wedged: false,
is_wedged: None,
error: None,
});

// Save the new vector into db.
Expand Down Expand Up @@ -1363,7 +1370,8 @@ impl_state_store!({
// Modify the one event.
if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
entry.event = content;
entry.is_wedged = false;
entry.is_wedged = None;
entry.error = None;

// Save the new vector into db.
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
Expand Down Expand Up @@ -1432,7 +1440,19 @@ impl_state_store!({
.map(|item| QueuedEvent {
event: item.event,
transaction_id: item.transaction_id,
is_wedged: item.is_wedged,
error: match item.is_wedged {
Some(legacy_wedged) => {
if legacy_wedged {
// migrate a generic error
Some(QueueWedgeError::GenericApiError {
msg: "local echo failed to send in a previous session".into(),
})
} else {
None
}
}
None => item.error,
},
})
.collect())
}
Expand All @@ -1441,7 +1461,7 @@ impl_state_store!({
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
wedged: bool,
error: Option<QueueWedgeError>,
) -> Result<()> {
let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);

Expand All @@ -1456,7 +1476,8 @@ impl_state_store!({
if let Some(queued_event) =
prev.iter_mut().find(|item| item.transaction_id == transaction_id)
{
queued_event.is_wedged = wedged;
queued_event.is_wedged = None;
queued_event.error = error;
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- New send queue events, now persists the type of error causing it to be wedged
ALTER TABLE "send_queue_events"
-- Used as a value, thus encrypted/decrypted
ADD COLUMN "wedge_reason" BLOB;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE "send_queue_events"
DROP COLUMN "wedged";
Loading

0 comments on commit 4e43dc2

Please sign in to comment.