Skip to content

Commit

Permalink
chore(ui): Move the RoomEventCacheUpdate code block into its own fu…
Browse files Browse the repository at this point in the history
…nction.

This patch is a refactoring. It moves the code block responsible to
handle the `RoomEventCacheUpdate`s into its own function for the sake
of clarity.
  • Loading branch information
Hywan committed Jun 12, 2024
1 parent 7263235 commit 66847ad
Showing 1 changed file with 96 additions and 90 deletions.
186 changes: 96 additions & 90 deletions crates/matrix-sdk-ui/src/timeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ use std::{collections::BTreeSet, sync::Arc};

use futures_util::{pin_mut, StreamExt};
use matrix_sdk::{
event_cache::{EventsOrigin, RoomEventCacheUpdate},
event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheUpdate},
executor::spawn,
send_queue::{LocalEcho, RoomSendQueueUpdate},
Room,
};
use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::{error::RecvError, Receiver};
use tracing::{info, info_span, trace, warn, Instrument, Span};

#[cfg(feature = "e2e-encryption")]
Expand Down Expand Up @@ -156,7 +156,7 @@ impl TimelineBuilder {
event_cache.subscribe()?;

let (room_event_cache, event_cache_drop) = room.event_cache().await?;
let (_, mut event_subscriber) = room_event_cache.subscribe().await?;
let (_, event_subscriber) = room_event_cache.subscribe().await?;

let is_live = matches!(focus, TimelineFocus::Live);

Expand All @@ -169,98 +169,14 @@ impl TimelineBuilder {
let client = room.client();

let room_update_join_handle = spawn({
let room_event_cache = room_event_cache.clone();
let inner = inner.clone();

let span =
info_span!(parent: Span::none(), "room_update_handler", room_id = ?room.room_id());
span.follows_from(Span::current());

async move {
trace!("Spawned the event subscriber task.");

loop {
trace!("Waiting for an event.");

let update = match event_subscriber.recv().await {
Ok(up) => up,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(num_skipped)) => {
warn!(
num_skipped,
"Lagged behind event cache updates, resetting timeline"
);

// The updates might have lagged, but the room event cache might have
// events, so retrieve them and add them back again to the timeline,
// after clearing it.
//
// If we can't get a handle on the room cache's events, just clear the
// current timeline.
match room_event_cache.subscribe().await {
Ok((events, _)) => {
inner.replace_with_initial_remote_events(events, RemoteEventOrigin::Sync).await;
}
Err(err) => {
warn!("Error when re-inserting initial events into the timeline: {err}");
inner.clear().await;
}
}

continue;
}
};

match update {
RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
trace!(target = %event_id, "Handling fully read marker.");
inner.handle_fully_read_marker(event_id).await;
}

RoomEventCacheUpdate::Clear => {
if !inner.is_live().await {
// Ignore a clear for a timeline not in the live mode; the
// focused-on-event mode doesn't add any new items to the timeline
// anyways.
continue;
}

trace!("Clearing the timeline.");
inner.clear().await;
}

RoomEventCacheUpdate::AddTimelineEvents { events: diffs, origin } => {
trace!("Received new timeline events.");
trace!("Spawned the event subscriber task.");

inner.add_events_with_diffs(
diffs,
match origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
}
).await;
}

RoomEventCacheUpdate::AddEphemeralEvents { events } => {
trace!("Received new ephemeral events from sync.");

// TODO: (bnjbvr) ephemeral should be handled by the event cache.
inner.handle_ephemeral_events(events).await;
}

RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
if !ambiguity_changes.is_empty() {
let member_ambiguity_changes = ambiguity_changes
.values()
.flat_map(|change| change.user_ids())
.collect::<BTreeSet<_>>();
inner.force_update_sender_profiles(&member_ambiguity_changes).await;
}
}
}
}
}
.instrument(span)
handle_room_update(inner.clone(), room_event_cache.clone(), event_subscriber)
.instrument(span)
});

let local_echo_listener_handle = if is_live {
Expand Down Expand Up @@ -423,3 +339,93 @@ impl TimelineBuilder {
Ok(timeline)
}
}

/// This function is responsible to handle all `RoomEventCacheUpdate` and to
/// dispatch them.
async fn handle_room_update(
timeline: TimelineInner,
room_event_cache: RoomEventCache,
mut event_subscriber: Receiver<RoomEventCacheUpdate>,
) {
loop {
trace!("Waiting for an event.");

let update = match event_subscriber.recv().await {
Ok(up) => up,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");

// The updates might have lagged, but the room event cache might have
// events, so retrieve them and add them back again to the timeline,
// after clearing it.
//
// If we can't get a handle on the room cache's events, just clear the
// current timeline.
match room_event_cache.subscribe().await {
Ok((events, _)) => {
timeline
.replace_with_initial_remote_events(events, RemoteEventOrigin::Sync)
.await;
}
Err(err) => {
warn!("Error when re-inserting initial events into the timeline: {err}");
timeline.clear().await;
}
}

continue;
}
};

match update {
RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
trace!(target = %event_id, "Handling fully read marker.");
timeline.handle_fully_read_marker(event_id).await;
}

RoomEventCacheUpdate::Clear => {
if !timeline.is_live().await {
// Ignore a clear for a timeline not in the live mode; the
// focused-on-event mode doesn't add any new items to the timeline
// anyways.
continue;
}

trace!("Clearing the timeline.");
timeline.clear().await;
}

RoomEventCacheUpdate::AddTimelineEvents { events: diffs, origin } => {
trace!("Received new timeline events.");

timeline
.add_events_with_diffs(
diffs,
match origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
},
)
.await;
}

RoomEventCacheUpdate::AddEphemeralEvents { events } => {
trace!("Received new ephemeral events from sync.");

// TODO: (bnjbvr) ephemeral should be handled by the event cache.
timeline.handle_ephemeral_events(events).await;
}

RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
if !ambiguity_changes.is_empty() {
let member_ambiguity_changes = ambiguity_changes
.values()
.flat_map(|change| change.user_ids())
.collect::<BTreeSet<_>>();
timeline.force_update_sender_profiles(&member_ambiguity_changes).await;
}
}
}
}
}

0 comments on commit 66847ad

Please sign in to comment.