From b5b390d71443a2ea1944aab68f0c74d667035ea2 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 16 Aug 2023 01:17:10 +0530 Subject: [PATCH] Refactored realtime presence to make it easy to understand --- ably/proto_presence_message.go | 55 ++++++++++++++++++---------------- ably/realtime_presence.go | 19 ++++++------ 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/ably/proto_presence_message.go b/ably/proto_presence_message.go index f30956ed..0514cd0e 100644 --- a/ably/proto_presence_message.go +++ b/ably/proto_presence_message.go @@ -62,38 +62,41 @@ func (m PresenceMessage) String() string { } // RTP2b1 -func (msg *PresenceMessage) isServerSynthesizedPresenceMessage() bool { - return strings.HasPrefix(msg.ID, msg.ConnectionID) +func (msg *PresenceMessage) isServerSynthesized() bool { + return !strings.HasPrefix(msg.ID, msg.ConnectionID) } -func (incomingMessage *PresenceMessage) IsOlderThan(oldMessage *PresenceMessage) (bool, error) { - if oldMessage.isServerSynthesizedPresenceMessage() || - incomingMessage.isServerSynthesizedPresenceMessage() { - return oldMessage.Timestamp > incomingMessage.Timestamp, nil +// RTP2b2 +func (msg *PresenceMessage) getMsgSerialAndIndex() (int64, int64, error) { + msgIds := strings.Split(msg.ID, ":") + if len(msgIds) != 3 { + return 0, 0, fmt.Errorf("parsing error, the presence message has invalid id %v", msg.ID) } - - presenceIdErr := func(presenceMsgId string) error { - return fmt.Errorf("parsing error, the presence message has invalid id %v", presenceMsgId) + msgSerial, err := strconv.ParseInt(msgIds[1], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("parsing error, the presence message has invalid msgSerial %v", msg.ID) } - - oldMessageIds := strings.Split(oldMessage.ID, ":") - incomingMessageIds := strings.Split(incomingMessage.ID, ":") - - oldMessageSerial, err := strconv.ParseInt(oldMessageIds[1], 10, 64) - oldMessageIndex, err := strconv.ParseInt(oldMessageIds[2], 10, 64) - if len(oldMessageIds) != 3 || err != nil { - return false, presenceIdErr(oldMessage.ID) + msgIndex, err := strconv.ParseInt(msgIds[2], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("parsing error, the presence message has invalid msgIndex %v", msg.ID) } + return msgSerial, msgIndex, nil +} - incomingMessageSerial, err := strconv.ParseInt(incomingMessageIds[1], 10, 64) - incomingMessageIndex, err := strconv.ParseInt(incomingMessageIds[2], 10, 64) - if len(incomingMessageIds) != 3 || err != nil { - return true, presenceIdErr(incomingMessage.ID) +func (msg1 *PresenceMessage) IsNewerThan(msg2 *PresenceMessage) (bool, error) { + if msg1.isServerSynthesized() || msg2.isServerSynthesized() { + return msg1.Timestamp > msg2.Timestamp, nil } - - if oldMessageSerial == incomingMessageSerial { - return oldMessageIndex > incomingMessageIndex, nil + msg1Serial, msg1Index, err := msg1.getMsgSerialAndIndex() + if err != nil { + return false, err } - - return oldMessageSerial > incomingMessageSerial, nil + msg2Serial, msg2Index, err := msg2.getMsgSerialAndIndex() + if err != nil { + return true, err + } + if msg1Serial == msg2Serial { + return msg1Index > msg2Index, nil + } + return msg1Serial > msg2Serial, nil } diff --git a/ably/realtime_presence.go b/ably/realtime_presence.go index 0c634b43..5dfb5f2f 100644 --- a/ably/realtime_presence.go +++ b/ably/realtime_presence.go @@ -194,19 +194,18 @@ func (pres *RealtimePresence) processIncomingMessage(msg *protocolMessage, syncS } } - // Filter out old presenceMessages by their timestamp. - newPresenceMessages := make([]*PresenceMessage, 0, len(msg.Presence)) // Update presence map / channel's member state. + newPresenceMessages := make([]*PresenceMessage, 0, len(msg.Presence)) for _, presenceMember := range msg.Presence { - // RTP2 memberKey := presenceMember.ConnectionID + presenceMember.ClientID - - if oldPresenceMember, ok := pres.members[memberKey]; ok { // RTP2a - isIncomingMessageOld, err := presenceMember.IsOlderThan(oldPresenceMember) - pres.log().Error(err) - // TODO - publish channel error event here without state change - if isIncomingMessageOld { // RTP2b1 - continue // do not process message with older timestamp // RTP2b1a + if existingMember, ok := pres.members[memberKey]; ok { // RTP2a + isMemberNew, err := presenceMember.IsNewerThan(existingMember) // RTP2b + if err != nil { + pres.log().Error(err) + // TODO - publish channel error event here without state change + } + if !isMemberNew { + continue // do not accept if incoming member is old } } switch presenceMember.Action {