Skip to content

Commit

Permalink
Refactored realtime presence to make it easy to understand
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Aug 15, 2023
1 parent 95b5ef8 commit b5b390d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 36 deletions.
55 changes: 29 additions & 26 deletions ably/proto_presence_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,38 +62,41 @@ func (m PresenceMessage) String() string {
}

// RTP2b1
func (msg *PresenceMessage) isServerSynthesizedPresenceMessage() bool {
return strings.HasPrefix(msg.ID, msg.ConnectionID)
func (msg *PresenceMessage) isServerSynthesized() bool {
return !strings.HasPrefix(msg.ID, msg.ConnectionID)
}

func (incomingMessage *PresenceMessage) IsOlderThan(oldMessage *PresenceMessage) (bool, error) {
if oldMessage.isServerSynthesizedPresenceMessage() ||
incomingMessage.isServerSynthesizedPresenceMessage() {
return oldMessage.Timestamp > incomingMessage.Timestamp, nil
// RTP2b2
func (msg *PresenceMessage) getMsgSerialAndIndex() (int64, int64, error) {
msgIds := strings.Split(msg.ID, ":")
if len(msgIds) != 3 {
return 0, 0, fmt.Errorf("parsing error, the presence message has invalid id %v", msg.ID)
}

presenceIdErr := func(presenceMsgId string) error {
return fmt.Errorf("parsing error, the presence message has invalid id %v", presenceMsgId)
msgSerial, err := strconv.ParseInt(msgIds[1], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parsing error, the presence message has invalid msgSerial %v", msg.ID)
}

oldMessageIds := strings.Split(oldMessage.ID, ":")
incomingMessageIds := strings.Split(incomingMessage.ID, ":")

oldMessageSerial, err := strconv.ParseInt(oldMessageIds[1], 10, 64)
oldMessageIndex, err := strconv.ParseInt(oldMessageIds[2], 10, 64)
if len(oldMessageIds) != 3 || err != nil {
return false, presenceIdErr(oldMessage.ID)
msgIndex, err := strconv.ParseInt(msgIds[2], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parsing error, the presence message has invalid msgIndex %v", msg.ID)
}
return msgSerial, msgIndex, nil
}

incomingMessageSerial, err := strconv.ParseInt(incomingMessageIds[1], 10, 64)
incomingMessageIndex, err := strconv.ParseInt(incomingMessageIds[2], 10, 64)
if len(incomingMessageIds) != 3 || err != nil {
return true, presenceIdErr(incomingMessage.ID)
func (msg1 *PresenceMessage) IsNewerThan(msg2 *PresenceMessage) (bool, error) {
if msg1.isServerSynthesized() || msg2.isServerSynthesized() {
return msg1.Timestamp > msg2.Timestamp, nil
}

if oldMessageSerial == incomingMessageSerial {
return oldMessageIndex > incomingMessageIndex, nil
msg1Serial, msg1Index, err := msg1.getMsgSerialAndIndex()
if err != nil {
return false, err
}

return oldMessageSerial > incomingMessageSerial, nil
msg2Serial, msg2Index, err := msg2.getMsgSerialAndIndex()
if err != nil {
return true, err
}
if msg1Serial == msg2Serial {
return msg1Index > msg2Index, nil
}
return msg1Serial > msg2Serial, nil
}
19 changes: 9 additions & 10 deletions ably/realtime_presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,19 +194,18 @@ func (pres *RealtimePresence) processIncomingMessage(msg *protocolMessage, syncS
}
}

// Filter out old presenceMessages by their timestamp.
newPresenceMessages := make([]*PresenceMessage, 0, len(msg.Presence))
// Update presence map / channel's member state.
newPresenceMessages := make([]*PresenceMessage, 0, len(msg.Presence))
for _, presenceMember := range msg.Presence {
// RTP2
memberKey := presenceMember.ConnectionID + presenceMember.ClientID

if oldPresenceMember, ok := pres.members[memberKey]; ok { // RTP2a
isIncomingMessageOld, err := presenceMember.IsOlderThan(oldPresenceMember)
pres.log().Error(err)
// TODO - publish channel error event here without state change
if isIncomingMessageOld { // RTP2b1
continue // do not process message with older timestamp // RTP2b1a
if existingMember, ok := pres.members[memberKey]; ok { // RTP2a
isMemberNew, err := presenceMember.IsNewerThan(existingMember) // RTP2b
if err != nil {
pres.log().Error(err)
// TODO - publish channel error event here without state change
}
if !isMemberNew {
continue // do not accept if incoming member is old
}
}
switch presenceMember.Action {
Expand Down

0 comments on commit b5b390d

Please sign in to comment.