From b3047f3f17187a319c6c2d6145917223b0f7bc84 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Sep 2024 10:22:46 +0100 Subject: [PATCH] Sliding sync: various fixups to the sliding sync joined room background job (#17673) Follow-up to #17652, https://github.com/element-hq/synapse/pull/17641, https://github.com/element-hq/synapse/pull/17634, https://github.com/element-hq/synapse/pull/17631 and https://github.com/element-hq/synapse/pull/17632 to fix-up https://github.com/element-hq/synapse/pull/17512 --- changelog.d/17673.misc | 1 + .../databases/main/events_bg_updates.py | 27 ++++++++++++------- 2 files changed, 19 insertions(+), 9 deletions(-) create mode 100644 changelog.d/17673.misc diff --git a/changelog.d/17673.misc b/changelog.d/17673.misc new file mode 100644 index 00000000000..756918e2b21 --- /dev/null +++ b/changelog.d/17673.misc @@ -0,0 +1 @@ +Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 12670e87d2a..b3244f74573 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1595,17 +1595,15 @@ def _txn(txn: LoggingTransaction) -> None: # starve disk usage while this goes on. # # We upsert in case we have to run this multiple times. - # - # The `WHERE TRUE` clause is to avoid "Parsing Ambiguity" txn.execute( """ INSERT INTO sliding_sync_joined_rooms_to_recalculate (room_id) - SELECT room_id FROM rooms WHERE ? + SELECT DISTINCT room_id FROM local_current_membership + WHERE membership = 'join' ON CONFLICT (room_id) DO NOTHING; """, - (True,), ) await self.db_pool.runInteraction( @@ -1689,7 +1687,15 @@ def _get_rooms_to_update_txn(txn: LoggingTransaction) -> List[Tuple[str]]: if not current_state_ids_map: continue - fetched_events = await self.get_events(current_state_ids_map.values()) + try: + fetched_events = await self.get_events(current_state_ids_map.values()) + except (DatabaseCorruptionError, InvalidEventError) as e: + logger.warning( + "Failed to fetch state for room '%s' due to corrupted events. Ignoring. Error: %s", + room_id, + e, + ) + continue current_state_map: StateMap[EventBase] = { state_key: fetched_events[event_id] @@ -1722,10 +1728,13 @@ def _get_rooms_to_update_txn(txn: LoggingTransaction) -> List[Tuple[str]]: + "given we pulled the room out of `current_state_events`" ) most_recent_event_stream_ordering = most_recent_event_pos_results[1].stream - assert most_recent_event_stream_ordering > 0, ( - "We should have at-least one event in the room (our own join membership event for example) " - + "that isn't backfilled (negative `stream_ordering`) if we are joined to the room." - ) + + # The `most_recent_event_stream_ordering` should be positive, + # however there are (very rare) rooms where that is not the case in + # the matrix.org database. It's not clear how they got into that + # state, but does mean that we cannot assert that the stream + # ordering is indeed positive. + # Figure out the latest `bump_stamp` in the room. This could be `None` for a # federated room you just joined where all of events are still `outliers` or # backfilled history. In the Sliding Sync API, we default to the user's