diff --git a/ably/error_names.go b/ably/error_names.go index d14381ca1..708477e1d 100644 --- a/ably/error_names.go +++ b/ably/error_names.go @@ -24,6 +24,7 @@ const ( ErrTimeoutError ErrorCode = 50003 ErrConnectionFailed ErrorCode = 80000 ErrConnectionSuspended ErrorCode = 80002 + ErrConnectionClosed ErrorCode = 80017 ErrDisconnected ErrorCode = 80003 ErrProtocolError ErrorCode = 80013 ErrChannelOperationFailed ErrorCode = 90000 diff --git a/ably/errors.go b/ably/errors.go index fad63153b..bf74f46ce 100644 --- a/ably/errors.go +++ b/ably/errors.go @@ -11,6 +11,8 @@ func (c ErrorCode) String() string { return "(error code not set)" case 10000: return "no error" + case 20000: + return "general error code" case 40000: return "bad request" case 40001: @@ -307,6 +309,16 @@ func (c ErrorCode) String() string { return "presence state is out of sync" case 91100: return "member implicitly left presence channel (connection closed)" + case 101000: + return "must have a non-empty name for the space" + case 101001: + return "must enter a space to perform this operation" + case 101002: + return "lock request already exists" + case 101003: + return "lock is currently locked" + case 101004: + return "lock was invalidated by a concurrent lock request which now holds the lock" } return "" } diff --git a/ably/export_test.go b/ably/export_test.go index 39151bcd8..55aaef87f 100644 --- a/ably/export_test.go +++ b/ably/export_test.go @@ -93,6 +93,12 @@ func (c *REST) GetCachedFallbackHost() string { return c.successFallbackHost.get() } +func (c *RealtimeChannel) GetChannelSerial() string { + c.mtx.Lock() + defer c.mtx.Unlock() + return c.properties.ChannelSerial +} + func (c *RealtimeChannel) GetAttachResume() bool { c.mtx.Lock() defer c.mtx.Unlock() @@ -105,6 +111,12 @@ func (c *RealtimeChannel) SetAttachResume(value bool) { c.attachResume = value } +func (c *RealtimeChannel) SetState(chanState ChannelState) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.state = chanState +} + func (opts *clientOptions) GetFallbackRetryTimeout() time.Duration { return opts.fallbackRetryTimeout() } @@ -193,6 +205,55 @@ func (c *Connection) PendingItems() int { return len(c.pending.queue) } +// AckAll empties queue and acks all pending callbacks +func (c *Connection) AckAll() { + c.mtx.Lock() + cx := c.pending.Dismiss() + c.mtx.Unlock() + c.log().Infof("Ack all %d messages waiting for ACK/NACK", len(cx)) + for _, v := range cx { + v.onAck(nil) + } +} + +func (c *Connection) SetKey(key string) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.key = key +} + +func (c *RealtimePresence) Members() map[string]*PresenceMessage { + c.mtx.Lock() + defer c.mtx.Unlock() + presenceMembers := make(map[string]*PresenceMessage, len(c.members)) + for k, pm := range c.members { + presenceMembers[k] = pm + } + return presenceMembers +} + +func (c *RealtimePresence) InternalMembers() map[string]*PresenceMessage { + c.mtx.Lock() + defer c.mtx.Unlock() + internalMembers := make(map[string]*PresenceMessage, len(c.internalMembers)) + for k, pm := range c.internalMembers { + internalMembers[k] = pm + } + return internalMembers +} + +func (c *RealtimePresence) SyncInitial() bool { + c.mtx.Lock() + defer c.mtx.Unlock() + return c.syncState == syncInitial +} + +func (c *RealtimePresence) SyncInProgress() bool { + c.mtx.Lock() + defer c.mtx.Unlock() + return c.syncState == syncInProgress +} + func (c *Connection) ConnectionStateTTL() time.Duration { return c.connectionStateTTL() } diff --git a/ably/mock_test.go b/ably/mock_test.go deleted file mode 100644 index 1c6ffdf71..000000000 --- a/ably/mock_test.go +++ /dev/null @@ -1,32 +0,0 @@ -// mocks and helpers for unit tests. - -package ably - -import ( - "bytes" - "log" -) - -var ( - buffer bytes.Buffer - mocklogger = log.New(&buffer, "logger: ", log.Lshortfile) -) - -// mockChannelWithState is a test helper that returns a mock channel in a specified state -func mockChannelWithState(channelState *ChannelState, connectionState *ConnectionState) *RealtimeChannel { - mockChannel := RealtimeChannel{ - client: &Realtime{ - rest: &REST{ - log: logger{l: &stdLogger{mocklogger}}, - }, - Connection: &Connection{}, - }, - } - if channelState != nil { - mockChannel.state = *channelState - } - if connectionState != nil { - mockChannel.client.Connection.state = *connectionState - } - return &mockChannel -} diff --git a/ably/options.go b/ably/options.go index 98f92e472..5e22a3774 100644 --- a/ably/options.go +++ b/ably/options.go @@ -235,7 +235,6 @@ func (opts *authOptions) KeySecret() string { // clientOptions passes additional client-specific properties to the [ably.NewREST] or to the [ably.NewRealtime]. // Properties set using [ably.clientOptions] are used instead of the [ably.defaultOptions] values. type clientOptions struct { - // authOptions Embedded an [ably.authOptions] object (TO3j). authOptions diff --git a/ably/proto_presence_message.go b/ably/proto_presence_message.go index bc19f7b41..b8d1ab99b 100644 --- a/ably/proto_presence_message.go +++ b/ably/proto_presence_message.go @@ -2,6 +2,8 @@ package ably import ( "fmt" + "strconv" + "strings" ) // PresenceAction describes the possible actions members in the presence set can emit (TP2). @@ -58,3 +60,45 @@ func (m PresenceMessage) String() string { "update", }[m.Action], m.ClientID, m.Data) } + +func (msg *PresenceMessage) isServerSynthesized() bool { + return !strings.HasPrefix(msg.ID, msg.ConnectionID) +} + +func (msg *PresenceMessage) getMsgSerialAndIndex() (int64, int64, error) { + msgIds := strings.Split(msg.ID, ":") + if len(msgIds) != 3 { + return 0, 0, fmt.Errorf("parsing error, the presence message has invalid id %v", msg.ID) + } + msgSerial, err := strconv.ParseInt(msgIds[1], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("parsing error, the presence message has invalid msgSerial, for msgId %v", msg.ID) + } + msgIndex, err := strconv.ParseInt(msgIds[2], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("parsing error, the presence message has invalid msgIndex, for msgId %v", msg.ID) + } + return msgSerial, msgIndex, nil +} + +// RTP2b, RTP2c +func (msg1 *PresenceMessage) IsNewerThan(msg2 *PresenceMessage) (bool, error) { + // RTP2b1 + if msg1.isServerSynthesized() || msg2.isServerSynthesized() { + return msg1.Timestamp > msg2.Timestamp, nil + } + + // RTP2b2 + msg1Serial, msg1Index, err := msg1.getMsgSerialAndIndex() + if err != nil { + return false, err + } + msg2Serial, msg2Index, err := msg2.getMsgSerialAndIndex() + if err != nil { + return true, err + } + if msg1Serial == msg2Serial { + return msg1Index > msg2Index, nil + } + return msg1Serial > msg2Serial, nil +} diff --git a/ably/proto_presence_message_test.go b/ably/proto_presence_message_test.go index 24fb12eed..dfb32620a 100644 --- a/ably/proto_presence_message_test.go +++ b/ably/proto_presence_message_test.go @@ -4,12 +4,15 @@ package ably_test import ( + "context" "encoding/json" "fmt" "testing" + "time" "github.com/ably/ably-go/ably" "github.com/ably/ably-go/ably/internal/ablyutil" + "github.com/ably/ably-go/ablytest" "github.com/stretchr/testify/assert" ) @@ -57,3 +60,1191 @@ func TestPresenceMessage(t *testing.T) { }) } } + +func TestPresenceCheckForNewNessByTimestampIfSynthesized_RTP2b1(t *testing.T) { + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "123:12:1", + Timestamp: 125, + ConnectionID: "987", + }, + } + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "123:12:2", + Timestamp: 123, + ConnectionID: "784", + }, + } + isNewMsg, err := presenceMsg1.IsNewerThan(presenceMsg2) + assert.Nil(t, err) + assert.True(t, isNewMsg) + + isNewMsg, err = presenceMsg2.IsNewerThan(presenceMsg1) + assert.Nil(t, err) + assert.False(t, isNewMsg) +} + +func TestPresenceCheckForNewNessBySerialIfNotSynthesized__RTP2b2(t *testing.T) { + oldPresenceMsg := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "123:12:0", + Timestamp: 123, + ConnectionID: "123", + }, + } + newPresenceMessage := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "123:12:1", + Timestamp: 123, + ConnectionID: "123", + }, + } + isNewMsg, err := oldPresenceMsg.IsNewerThan(newPresenceMessage) + assert.Nil(t, err) + assert.False(t, isNewMsg) + + isNewMsg, err = newPresenceMessage.IsNewerThan(oldPresenceMsg) + assert.Nil(t, err) + assert.True(t, isNewMsg) +} + +func TestPresenceMessagesShouldReturnErrorForWrongMessageSerials__RTP2b2(t *testing.T) { + // Both has invalid msgserial + msg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "123:1a:0", + Timestamp: 123, + ConnectionID: "123", + }, + } + + msg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "123:1b:1", + Timestamp: 124, + ConnectionID: "123", + }, + } + isNewMsg, err := msg1.IsNewerThan(msg2) + assert.NotNil(t, err) + assert.Contains(t, fmt.Sprint(err), "the presence message has invalid msgSerial, for msgId 123:1a:0") + assert.False(t, isNewMsg) + + isNewMsg, err = msg2.IsNewerThan(msg1) + assert.NotNil(t, err) + assert.Contains(t, fmt.Sprint(err), "the presence message has invalid msgSerial, for msgId 123:1b:1") + assert.False(t, isNewMsg) + + // msg2 has valid messageSerial + msg2 = &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "123:10:0", + Timestamp: 124, + ConnectionID: "123", + }, + } + + isNewMsg, err = msg1.IsNewerThan(msg2) + assert.NotNil(t, err) + assert.Contains(t, fmt.Sprint(err), "the presence message has invalid msgSerial, for msgId 123:1a:0") + assert.False(t, isNewMsg) + + isNewMsg, err = msg2.IsNewerThan(msg1) + assert.NotNil(t, err) + assert.Contains(t, fmt.Sprint(err), "the presence message has invalid msgSerial, for msgId 123:1a:0") + assert.True(t, isNewMsg) +} + +func Test_PresenceMap_RTP2(t *testing.T) { + const channelRetryTimeout = 123 * time.Millisecond + const realtimeRequestTimeout = 2 * time.Second + + setup := func(t *testing.T) ( + in, out chan *ably.ProtocolMessage, + c *ably.Realtime, + channel *ably.RealtimeChannel, + stateChanges ably.ChannelStateChanges, + afterCalls chan ablytest.AfterCall, + presenceMsgCh chan *ably.PresenceMessage, + ) { + in = make(chan *ably.ProtocolMessage, 1) + out = make(chan *ably.ProtocolMessage, 16) + presenceMsgCh = make(chan *ably.PresenceMessage, 16) + + afterCalls = make(chan ablytest.AfterCall, 1) + now, after := ablytest.TimeFuncs(afterCalls) + + c, _ = ably.NewRealtime( + ably.WithToken("fake:token"), + ably.WithAutoConnect(false), + ably.WithChannelRetryTimeout(channelRetryTimeout), + ably.WithRealtimeRequestTimeout(realtimeRequestTimeout), + ably.WithDial(MessagePipe(in, out)), + ably.WithNow(now), + ably.WithAfter(after), + ) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionConnected, + ConnectionID: "connection-id", + ConnectionDetails: &ably.ConnectionDetails{}, + } + + err := ablytest.Wait(ablytest.ConnWaiter(c, c.Connect, ably.ConnectionEventConnected), nil) + assert.NoError(t, err) + + channel = c.Channels.Get("test") + stateChanges = make(ably.ChannelStateChanges, 10) + channel.OnAll(stateChanges.Receive) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionAttached, + Channel: channel.Name, + } + + var change ably.ChannelStateChange + + ablytest.Instantly.Recv(t, &change, stateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateAttached, change.Current, + "expected %v; got %v (event: %+v)", ably.ChannelStateAttached, change.Current) + + channel.Presence.SubscribeAll(context.Background(), func(message *ably.PresenceMessage) { + presenceMsgCh <- message + }) + return + } + + t.Run("RTP2: should maintain a list of members present on the channel", func(t *testing.T) { + in, _, _, channel, _, _, presenceMsgCh := setup(t) + + initialMembers := channel.Presence.Members() + assert.Empty(t, initialMembers) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "123:12:1", + Timestamp: 125, + ConnectionID: "987", + ClientID: "999", + }, + } + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "123:12:2", + Timestamp: 123, + ConnectionID: "784", + ClientID: "999", + }, + } + + msg := &ably.ProtocolMessage{ + Action: ably.ActionPresence, + Channel: channel.Name, + Presence: []*ably.PresenceMessage{presenceMsg1}, + } + + in <- msg + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + presenceMembers := channel.Presence.Members() + + assert.Equal(t, 1, len(presenceMembers)) + member := presenceMembers[presenceMsg1.ConnectionID+presenceMsg1.ClientID] + assert.Equal(t, ably.PresenceActionPresent, member.Action) + + msg.Presence = []*ably.PresenceMessage{presenceMsg2} + in <- msg + + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + + presenceMembers = channel.Presence.Members() + assert.Equal(t, 2, len(presenceMembers)) + member2 := presenceMembers[presenceMsg2.ConnectionID+presenceMsg2.ClientID] + assert.Equal(t, ably.PresenceActionPresent, member2.Action) + }) + + t.Run("RTP2b1: check for newness by timestamp is synthesized", func(t *testing.T) { + in, _, _, channel, _, _, presenceMsgCh := setup(t) + + initialMembers := channel.Presence.Members() + assert.Empty(t, initialMembers) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:5", + Timestamp: 125, + ConnectionID: "987", + ClientID: "999", + }, + } + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "989:12:0", + Timestamp: 128, + ConnectionID: "987", + ClientID: "999", + }, + } + + msg := &ably.ProtocolMessage{ + Action: ably.ActionPresence, + Channel: channel.Name, + Presence: []*ably.PresenceMessage{presenceMsg1}, + } + + in <- msg + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + presenceMembers := channel.Presence.Members() + + assert.Equal(t, 1, len(presenceMembers)) + member := presenceMembers[presenceMsg1.ConnectionID+presenceMsg1.ClientID] + assert.Equal(t, ably.PresenceActionPresent, member.Action) + assert.Equal(t, "987:12:5", member.ID) + + msg.Presence = []*ably.PresenceMessage{presenceMsg2} + in <- msg + + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + presenceMembers = channel.Presence.Members() + assert.Equal(t, 1, len(presenceMembers)) + member = presenceMembers[presenceMsg1.ConnectionID+presenceMsg1.ClientID] + assert.Equal(t, ably.PresenceActionPresent, member.Action) + assert.Equal(t, "989:12:0", member.ID) + }) + + t.Run("RTP2b2, RTP2d: check for newness by serial if not synthesized", func(t *testing.T) { + in, _, _, channel, _, _, presenceMsgCh := setup(t) + + initialMembers := channel.Presence.Members() + assert.Empty(t, initialMembers) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:5", + Timestamp: 125, + ConnectionID: "987", + ClientID: "999", + }, + } + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:0", + Timestamp: 128, + ConnectionID: "987", + ClientID: "999", + }, + } + + presenceMsg3 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:7", + Timestamp: 128, + ConnectionID: "987", + ClientID: "999", + }, + } + + msg := &ably.ProtocolMessage{ + Action: ably.ActionPresence, + Channel: channel.Name, + Presence: []*ably.PresenceMessage{presenceMsg1}, + } + + in <- msg + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + presenceMembers := channel.Presence.Members() + + assert.Equal(t, 1, len(presenceMembers)) + member := presenceMembers[presenceMsg1.ConnectionID+presenceMsg1.ClientID] + assert.Equal(t, ably.PresenceActionPresent, member.Action) + assert.Equal(t, "987:12:5", member.ID) + + msg.Presence = []*ably.PresenceMessage{presenceMsg2} + in <- msg + + ablytest.Instantly.NoRecv(t, nil, presenceMsgCh, t.Fatalf) + presenceMembers = channel.Presence.Members() + assert.Equal(t, 1, len(presenceMembers)) + member = presenceMembers[presenceMsg1.ConnectionID+presenceMsg1.ClientID] + assert.Equal(t, ably.PresenceActionPresent, member.Action) + assert.Equal(t, "987:12:5", member.ID) + + msg.Presence = []*ably.PresenceMessage{presenceMsg3} + in <- msg + + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + presenceMembers = channel.Presence.Members() + assert.Equal(t, 1, len(presenceMembers)) + member = presenceMembers[presenceMsg1.ConnectionID+presenceMsg1.ClientID] + assert.Equal(t, ably.PresenceActionPresent, member.Action) + assert.Equal(t, "987:12:7", member.ID) + }) + + t.Run("RTP2c: check for newness during sync", func(t *testing.T) { + in, _, _, channel, stateChanges, _, presenceMsgCh := setup(t) + + initialMembers := channel.Presence.Members() + assert.Empty(t, initialMembers) + + assert.False(t, channel.Presence.SyncInitial()) + assert.True(t, channel.Presence.SyncComplete()) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionAttached, + Flags: ably.FlagHasPresence, + Channel: channel.Name, + } + + ablytest.Instantly.Recv(t, nil, stateChanges, t.Fatalf) + + assert.False(t, channel.Presence.SyncInitial()) + assert.True(t, channel.Presence.SyncInProgress()) + assert.False(t, channel.Presence.SyncComplete()) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:5", + Timestamp: 125, + ConnectionID: "987", + ClientID: "999", + }, + } + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:0", + Timestamp: 128, + ConnectionID: "987", + ClientID: "999", + }, + } + + presenceMsg3 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:7", + Timestamp: 128, + ConnectionID: "987", + ClientID: "999", + }, + } + + syncMessage := &ably.ProtocolMessage{ + Action: ably.ActionSync, + Channel: channel.Name, + ChannelSerial: "abcdefg:", + Presence: []*ably.PresenceMessage{presenceMsg1, presenceMsg2, presenceMsg3}, + } + + in <- syncMessage + + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + + assert.True(t, channel.Presence.SyncComplete()) + assert.False(t, channel.Presence.SyncInitial()) + presenceMembers := channel.Presence.Members() + assert.Equal(t, 1, len(presenceMembers)) + }) + + t.Run("RTP2d, RTP2g: when presence msg with ENTER, UPDATE AND PRESENT arrives, add to presence map with action as present", func(t *testing.T) { + in, _, _, channel, _, _, presenceMsgCh := setup(t) + + initialMembers := channel.Presence.Members() + assert.Empty(t, initialMembers) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionEnter, + Message: ably.Message{ + ID: "987:12:0", + Timestamp: 125, + ConnectionID: "987", + ClientID: "999", + }, + } + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionUpdate, + Message: ably.Message{ + ID: "987:12:1", + Timestamp: 128, + ConnectionID: "988", + ClientID: "999", + }, + } + + presenceMsg3 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:2", + Timestamp: 128, + ConnectionID: "989", + ClientID: "999", + }, + } + + msg := &ably.ProtocolMessage{ + Action: ably.ActionPresence, + Channel: channel.Name, + Presence: []*ably.PresenceMessage{presenceMsg1}, + } + + var presenceMsg *ably.PresenceMessage + in <- msg + // RTP2g + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + assert.Equal(t, ably.PresenceActionEnter, presenceMsg.Action) + + msg.Presence = []*ably.PresenceMessage{presenceMsg2} + in <- msg + // RTP2g + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + assert.Equal(t, ably.PresenceActionUpdate, presenceMsg.Action) + + msg.Presence = []*ably.PresenceMessage{presenceMsg3} + in <- msg + // RTP2g + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + assert.Equal(t, ably.PresenceActionPresent, presenceMsg.Action) + + members := channel.Presence.Members() + assert.Equal(t, 3, len(members)) + for _, pm := range members { + assert.Equal(t, ably.PresenceActionPresent, pm.Action) + } + }) + + t.Run("RTP2e, RTP2g: when presence msg with LEAVE action arrives, remove member from presence map", func(t *testing.T) { + in, _, _, channel, _, _, presenceMsgCh := setup(t) + + initialMembers := channel.Presence.Members() + assert.Empty(t, initialMembers) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionEnter, + Message: ably.Message{ + ID: "987:12:0", + Timestamp: 125, + ConnectionID: "987", + ClientID: "999", + }, + } + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionUpdate, + Message: ably.Message{ + ID: "988:12:1", + Timestamp: 128, + ConnectionID: "988", + ClientID: "999", + }, + } + + presenceMsg3 := &ably.PresenceMessage{ + Action: ably.PresenceActionLeave, + Message: ably.Message{ + ID: "987:13:0", + Timestamp: 130, + ConnectionID: "987", + ClientID: "999", + }, + } + + msg := &ably.ProtocolMessage{ + Action: ably.ActionPresence, + Channel: channel.Name, + Presence: []*ably.PresenceMessage{presenceMsg1}, + } + + var presenceMsg *ably.PresenceMessage + in <- msg + // RTP2g + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + assert.Equal(t, ably.PresenceActionEnter, presenceMsg.Action) + + members := channel.Presence.Members() + assert.Equal(t, 1, len(members)) + + msg.Presence = []*ably.PresenceMessage{presenceMsg2} + in <- msg + // RTP2g + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + assert.Equal(t, ably.PresenceActionUpdate, presenceMsg.Action) + + members = channel.Presence.Members() + assert.Equal(t, 2, len(members)) + + msg.Presence = []*ably.PresenceMessage{presenceMsg3} + in <- msg + // RTP2g + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + assert.Equal(t, ably.PresenceActionLeave, presenceMsg.Action) + + members = channel.Presence.Members() + assert.Equal(t, 1, len(members)) + for _, pm := range members { + assert.Equal(t, ably.PresenceActionPresent, pm.Action) + } + }) +} + +func Test_Presence_server_initiated_sync_RTP18(t *testing.T) { + + const channelRetryTimeout = 123 * time.Millisecond + const realtimeRequestTimeout = 2 * time.Second + + setup := func(t *testing.T) ( + in, out chan *ably.ProtocolMessage, + c *ably.Realtime, + channel *ably.RealtimeChannel, + stateChanges ably.ChannelStateChanges, + presenceMsgCh chan *ably.PresenceMessage, + ) { + in = make(chan *ably.ProtocolMessage, 1) + out = make(chan *ably.ProtocolMessage, 16) + presenceMsgCh = make(chan *ably.PresenceMessage, 16) + + c, _ = ably.NewRealtime( + ably.WithToken("fake:token"), + ably.WithAutoConnect(false), + ably.WithChannelRetryTimeout(channelRetryTimeout), + ably.WithRealtimeRequestTimeout(realtimeRequestTimeout), + ably.WithDial(MessagePipe(in, out)), + ) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionConnected, + ConnectionID: "connection-id", + ConnectionDetails: &ably.ConnectionDetails{}, + } + + err := ablytest.Wait(ablytest.ConnWaiter(c, c.Connect, ably.ConnectionEventConnected), nil) + assert.NoError(t, err) + + channel = c.Channels.Get("test") + stateChanges = make(ably.ChannelStateChanges, 10) + channel.OnAll(stateChanges.Receive) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionAttached, + Channel: channel.Name, + } + + var change ably.ChannelStateChange + + ablytest.Instantly.Recv(t, &change, stateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateAttached, change.Current, + "expected %v; got %v (event: %+v)", ably.ChannelStateAttached, change.Current) + + channel.Presence.SubscribeAll(context.Background(), func(message *ably.PresenceMessage) { + presenceMsgCh <- message + }) + return + } + + t.Run("RTP18a: client determines a new sync started with :", func(t *testing.T) { + in, _, _, channel, _, presenceMsgCh := setup(t) + + initialMembers := channel.Presence.Members() + assert.Empty(t, initialMembers) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:5", + Timestamp: 125, + ConnectionID: "987", + ClientID: "999", + }, + } + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:0", + Timestamp: 128, + ConnectionID: "987", + ClientID: "999", + }, + } + + presenceMsg3 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:7", + Timestamp: 128, + ConnectionID: "987", + ClientID: "999", + }, + } + + syncMessage := &ably.ProtocolMessage{ + Action: ably.ActionSync, + Channel: channel.Name, + ChannelSerial: "abcdefg:12", + Presence: []*ably.PresenceMessage{presenceMsg1, presenceMsg2, presenceMsg3}, + } + + assert.False(t, channel.Presence.SyncInitial()) + assert.False(t, channel.Presence.SyncInProgress()) + assert.True(t, channel.Presence.SyncComplete()) + + in <- syncMessage + + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + + assert.False(t, channel.Presence.SyncInitial()) + assert.True(t, channel.Presence.SyncInProgress()) + assert.False(t, channel.Presence.SyncComplete()) + + presenceMembers := channel.Presence.Members() + assert.Equal(t, 1, len(presenceMembers)) + }) + + t.Run("RTP18b: client determines sync ended with :", func(t *testing.T) { + in, _, _, channel, _, presenceMsgCh := setup(t) + + initialMembers := channel.Presence.Members() + assert.Empty(t, initialMembers) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:5", + Timestamp: 125, + ConnectionID: "987", + ClientID: "999", + }, + } + + syncMessage := &ably.ProtocolMessage{ + Action: ably.ActionSync, + Channel: channel.Name, + ChannelSerial: "abcdefg:12", + Presence: []*ably.PresenceMessage{presenceMsg1}, + } + + assert.False(t, channel.Presence.SyncInitial()) + assert.False(t, channel.Presence.SyncInProgress()) + assert.True(t, channel.Presence.SyncComplete()) + + in <- syncMessage + + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + + assert.False(t, channel.Presence.SyncInitial()) + assert.True(t, channel.Presence.SyncInProgress()) + assert.False(t, channel.Presence.SyncComplete()) + + presenceMembers := channel.Presence.Members() + assert.Equal(t, 1, len(presenceMembers)) + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:12", + Timestamp: 128, + ConnectionID: "980", + ClientID: "999", + }, + } + + // RTP18b + syncMessage = &ably.ProtocolMessage{ + Action: ably.ActionSync, + Channel: channel.Name, + ChannelSerial: "abcdefg:", + Presence: []*ably.PresenceMessage{presenceMsg2}, + } + in <- syncMessage + + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + + assert.False(t, channel.Presence.SyncInitial()) + assert.False(t, channel.Presence.SyncInProgress()) + assert.True(t, channel.Presence.SyncComplete()) + + presenceMembers = channel.Presence.Members() + assert.Equal(t, 2, len(presenceMembers)) + }) + + t.Run("RTP18: client determines sync started and ended with :", func(t *testing.T) { + in, _, _, channel, _, presenceMsgCh := setup(t) + + initialMembers := channel.Presence.Members() + assert.Empty(t, initialMembers) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:5", + Timestamp: 125, + ConnectionID: "987", + ClientID: "999", + }, + } + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:0", + Timestamp: 128, + ConnectionID: "987", + ClientID: "999", + }, + } + + presenceMsg3 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:7", + Timestamp: 128, + ConnectionID: "987", + ClientID: "999", + }, + } + + syncMessage := &ably.ProtocolMessage{ + Action: ably.ActionSync, + Channel: channel.Name, + ChannelSerial: "abcdefg:", + Presence: []*ably.PresenceMessage{presenceMsg1, presenceMsg2, presenceMsg3}, + } + + assert.False(t, channel.Presence.SyncInitial()) + assert.False(t, channel.Presence.SyncInProgress()) + assert.True(t, channel.Presence.SyncComplete()) + + in <- syncMessage + + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + + assert.False(t, channel.Presence.SyncInitial()) + assert.False(t, channel.Presence.SyncInProgress()) + assert.True(t, channel.Presence.SyncComplete()) + + presenceMembers := channel.Presence.Members() + assert.Equal(t, 1, len(presenceMembers)) + }) +} + +func Test_RTP1_attach_with_presence_flag(t *testing.T) { + const channelRetryTimeout = 123 * time.Millisecond + const realtimeRequestTimeout = 2 * time.Second + + in := make(chan *ably.ProtocolMessage, 1) + out := make(chan *ably.ProtocolMessage, 16) + + c, _ := ably.NewRealtime( + ably.WithToken("fake:token"), + ably.WithAutoConnect(false), + ably.WithChannelRetryTimeout(channelRetryTimeout), + ably.WithRealtimeRequestTimeout(realtimeRequestTimeout), + ably.WithDial(MessagePipe(in, out)), + ) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionConnected, + ConnectionID: "connection-id", + ConnectionDetails: &ably.ConnectionDetails{}, + } + + err := ablytest.Wait(ablytest.ConnWaiter(c, c.Connect, ably.ConnectionEventConnected), nil) + assert.NoError(t, err) + + channel := c.Channels.Get("test") + stateChanges := make(ably.ChannelStateChanges, 10) + channel.OnAll(stateChanges.Receive) + + assert.True(t, channel.Presence.SyncInitial()) + assert.False(t, channel.Presence.SyncInProgress()) + assert.False(t, channel.Presence.SyncComplete()) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionAttached, + Channel: channel.Name, + } + + var change ably.ChannelStateChange + + ablytest.Instantly.Recv(t, &change, stateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateAttached, change.Current, + "expected %v; got %v (event: %+v)", ably.ChannelStateAttached, change.Current) + + assert.False(t, channel.Presence.SyncInitial()) + assert.False(t, channel.Presence.SyncInProgress()) + assert.True(t, channel.Presence.SyncComplete()) + + initialMembers := channel.Presence.Members() + assert.Empty(t, initialMembers) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionAttached, + Flags: ably.FlagHasPresence, + Channel: channel.Name, + } + + ablytest.Instantly.Recv(t, nil, stateChanges, t.Fatalf) + + assert.False(t, channel.Presence.SyncInitial()) + assert.True(t, channel.Presence.SyncInProgress()) + assert.False(t, channel.Presence.SyncComplete()) +} + +func Test_internal_presencemap_RTP17(t *testing.T) { + const channelRetryTimeout = 123 * time.Millisecond + const realtimeRequestTimeout = 2 * time.Second + + setup := func(t *testing.T) ( + in, out chan *ably.ProtocolMessage, + c *ably.Realtime, + channel *ably.RealtimeChannel, + stateChanges ably.ChannelStateChanges, + presenceMsgCh chan *ably.PresenceMessage, + ) { + in = make(chan *ably.ProtocolMessage, 1) + out = make(chan *ably.ProtocolMessage, 16) + presenceMsgCh = make(chan *ably.PresenceMessage, 16) + + c, _ = ably.NewRealtime( + ably.WithKey("Auth:Key"), + ably.WithAutoConnect(false), + ably.WithChannelRetryTimeout(channelRetryTimeout), + ably.WithRealtimeRequestTimeout(realtimeRequestTimeout), + ably.WithDial(MessagePipe(in, out)), + ) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionConnected, + ConnectionID: "connection-id", + ConnectionDetails: &ably.ConnectionDetails{}, + } + + err := ablytest.Wait(ablytest.ConnWaiter(c, c.Connect, ably.ConnectionEventConnected), nil) + assert.NoError(t, err) + + channel = c.Channels.Get("test") + stateChanges = make(ably.ChannelStateChanges, 20) + channel.OnAll(stateChanges.Receive) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionAttached, + Channel: channel.Name, + } + + var change ably.ChannelStateChange + + ablytest.Soon.Recv(t, &change, stateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateAttached, change.Current, + "expected %v; got %v (event: %+v)", ably.ChannelStateAttached, change.Current) + + channel.Presence.SubscribeAll(context.Background(), func(message *ably.PresenceMessage) { + presenceMsgCh <- message + }) + return + } + + t.Run("RTP17: presence object should have second presencemap containing only currentConnectionId", func(t *testing.T) { + in, _, client, channel, _, presenceMsgCh := setup(t) + + initialMembers := channel.Presence.Members() + assert.Empty(t, initialMembers) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionEnter, + Message: ably.Message{ + ID: "987:12:0", + Timestamp: 125, + ConnectionID: client.Connection.ID(), + ClientID: "999", + }, + } + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionUpdate, + Message: ably.Message{ + ID: "988:12:1", + Timestamp: 128, + ConnectionID: "988", + ClientID: "999", + }, + } + + presenceMsg3 := &ably.PresenceMessage{ + Action: ably.PresenceActionEnter, + Message: ably.Message{ + ID: "987:13:0", + Timestamp: 130, + ConnectionID: "987", + ClientID: "999", + }, + } + + msg := &ably.ProtocolMessage{ + Action: ably.ActionPresence, + Channel: channel.Name, + Presence: []*ably.PresenceMessage{presenceMsg1, presenceMsg2, presenceMsg3}, + } + + var presenceMsg *ably.PresenceMessage + in <- msg + + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + + members := channel.Presence.Members() + assert.Equal(t, 3, len(members)) + + internalMembers := channel.Presence.InternalMembers() + assert.Equal(t, 1, len(internalMembers)) + + for _, pm := range internalMembers { + assert.Equal(t, client.Connection.ID(), pm.ConnectionID) + } + }) + + t.Run("RTP17b: apply presence message events as per spec", func(t *testing.T) { + in, _, client, channel, _, presenceMsgCh := setup(t) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionEnter, + Message: ably.Message{ + ID: "987:12:0", + Timestamp: 125, + ConnectionID: client.Connection.ID(), + ClientID: "999", + Data: "msg1", + }, + } + msg := &ably.ProtocolMessage{ + Action: ably.ActionPresence, + Channel: channel.Name, + } + msg.Presence = []*ably.PresenceMessage{presenceMsg1} + + var presenceMsg *ably.PresenceMessage + in <- msg + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + + internalMembers := channel.Presence.InternalMembers() + assert.Equal(t, 1, len(internalMembers)) + internalMember := internalMembers["999"] + assert.Equal(t, client.Connection.ID(), internalMember.ConnectionID) + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionUpdate, + Message: ably.Message{ + ID: "987:12:1", + Timestamp: 125, + ConnectionID: client.Connection.ID(), + ClientID: "456", + }, + } + + msg.Presence = []*ably.PresenceMessage{presenceMsg2} + + in <- msg + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + + internalMembers = channel.Presence.InternalMembers() + assert.Equal(t, 2, len(internalMembers)) + internalMember = internalMembers["456"] + assert.Equal(t, client.Connection.ID(), internalMember.ConnectionID) + + presenceMsg3 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: client.Connection.ID() + ":12:3", + Timestamp: 125, + ConnectionID: client.Connection.ID(), + ClientID: "978", + }, + } + + msg.Presence = []*ably.PresenceMessage{presenceMsg3} + + in <- msg + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + + internalMembers = channel.Presence.InternalMembers() + assert.Equal(t, 3, len(internalMembers)) + internalMember = internalMembers["978"] + assert.Equal(t, client.Connection.ID(), internalMember.ConnectionID) + + // server synthesized, connectionId not substring of ID + leaveMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionLeave, + Message: ably.Message{ + ID: "987:12:3", + Timestamp: 125, + ConnectionID: client.Connection.ID(), + ClientID: "978", + }, + } + + msg.Presence = []*ably.PresenceMessage{leaveMsg1} + + in <- msg + ablytest.Instantly.NoRecv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + + internalMembers = channel.Presence.InternalMembers() + assert.Equal(t, 3, len(internalMembers)) + internalMember = internalMembers["978"] + assert.Equal(t, client.Connection.ID(), internalMember.ConnectionID) + + // not a server synthesized + leaveMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionLeave, + Message: ably.Message{ + ID: client.Connection.ID() + ":12:4", + Timestamp: 125, + ConnectionID: client.Connection.ID(), + ClientID: "978", + }, + } + msg.Presence = []*ably.PresenceMessage{leaveMsg2} + + in <- msg + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + + internalMembers = channel.Presence.InternalMembers() + assert.Equal(t, 2, len(internalMembers)) + }) + + t.Run("RTP17h: presencemap should be keyed by clientId", func(t *testing.T) { + in, _, client, channel, _, presenceMsgCh := setup(t) + + initialMembers := channel.Presence.Members() + assert.Empty(t, initialMembers) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionEnter, + Message: ably.Message{ + ID: "987:12:0", + Timestamp: 125, + ConnectionID: client.Connection.ID(), + ClientID: "999", + }, + } + + msg := &ably.ProtocolMessage{ + Action: ably.ActionPresence, + Channel: channel.Name, + Presence: []*ably.PresenceMessage{presenceMsg1}, + } + + var presenceMsg *ably.PresenceMessage + in <- msg + + ablytest.Instantly.Recv(t, &presenceMsg, presenceMsgCh, t.Fatalf) + + members := channel.Presence.Members() + assert.Equal(t, 1, len(members)) + + internalMembers := channel.Presence.InternalMembers() + assert.Equal(t, 1, len(internalMembers)) + + for key, pm := range internalMembers { + assert.Equal(t, client.Connection.ID(), pm.ConnectionID) + assert.Equal(t, "999", key) + } + }) + + t.Run("RTP17f, RTP17g, RTP17e: automatic re-entry whenever channel moves into ATTACHED state", func(t *testing.T) { + in, out, client, channel, stateChanges, presenceMsgCh := setup(t) + + presenceMsg1 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:0", + Timestamp: 125, + ConnectionID: client.Connection.ID(), + ClientID: "999", + }, + } + + presenceMsg2 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:1", + Timestamp: 128, + ConnectionID: client.Connection.ID(), + ClientID: "234", + }, + } + + presenceMsg3 := &ably.PresenceMessage{ + Action: ably.PresenceActionPresent, + Message: ably.Message{ + ID: "987:12:2", + Timestamp: 128, + ConnectionID: "3435", + ClientID: "345", + }, + } + + msg := &ably.ProtocolMessage{ + Action: ably.ActionPresence, + Channel: channel.Name, + Presence: []*ably.PresenceMessage{presenceMsg1, presenceMsg2, presenceMsg3}, + } + + in <- msg + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + ablytest.Instantly.Recv(t, nil, presenceMsgCh, t.Fatalf) + + members := channel.Presence.Members() + assert.Equal(t, 3, len(members)) + + internalMembers := channel.Presence.InternalMembers() + assert.Equal(t, 2, len(internalMembers)) + + in <- &ably.ProtocolMessage{ + Action: ably.ActionAttached, + Channel: channel.Name, + } + + var chanChange ably.ChannelStateChange + ablytest.Instantly.Recv(t, &chanChange, stateChanges, t.Fatalf) + + // Send enter for internal messages + var protoMsg *ably.ProtocolMessage + ablytest.Instantly.Recv(t, &protoMsg, out, t.Fatalf) + for _, v := range protoMsg.Presence { + assert.Equal(t, ably.PresenceActionEnter, v.Action) + assert.Equal(t, client.Connection.ID(), v.ConnectionID) + } + client.Connection.AckAll() + + ablytest.Instantly.Recv(t, &protoMsg, out, t.Fatalf) + for _, v := range protoMsg.Presence { + assert.Equal(t, ably.PresenceActionEnter, v.Action) + assert.Equal(t, client.Connection.ID(), v.ConnectionID) + } + client.Connection.AckAll() + ablytest.Instantly.NoRecv(t, nil, out, t.Fatalf) + }) +} diff --git a/ably/proto_protocol_message_test.go b/ably/proto_protocol_message_test.go index c93f3f0e8..29fbded1a 100644 --- a/ably/proto_protocol_message_test.go +++ b/ably/proto_protocol_message_test.go @@ -23,8 +23,8 @@ func TestProtocolMessageEncodeZeroSerials(t *testing.T) { } encoded, err := ablyutil.MarshalMsgpack(msg) assert.NoError(t, err) - // expect a 3-element map with both the serial fields set to zero - expected := []byte("\x83\xB0id\xA4test\xA9msgSerial\x00") + // expect a 2-element map with both the serial fields set to zero + expected := []byte("\x82\xa2id\xa4test\xa9msgSerial\x00") assert.True(t, bytes.Equal(encoded, expected), "unexpected msgpack encoding\nexpected: %x\nactual: %x", expected, encoded) } diff --git a/ably/realtime_channel.go b/ably/realtime_channel.go index 73c27a6b8..07a8d8aa6 100644 --- a/ably/realtime_channel.go +++ b/ably/realtime_channel.go @@ -49,11 +49,9 @@ func newChannels(client *Realtime) *RealtimeChannels { // RTN16j, RTL15b func (channels *RealtimeChannels) SetChannelSerialsFromRecoverOption(serials map[string]string) { - channels.mtx.Lock() - defer channels.mtx.Unlock() for channelName, channelSerial := range serials { channel := channels.Get(channelName) - channel.properties.ChannelSerial = channelSerial + channel.setChannelSerial(channelSerial) } } @@ -62,7 +60,7 @@ func (channels *RealtimeChannels) GetChannelSerials() map[string]string { defer channels.mtx.Unlock() channelSerials := make(map[string]string) for channelName, realtimeChannel := range channels.chans { - channelSerials[channelName] = realtimeChannel.properties.ChannelSerial + channelSerials[channelName] = realtimeChannel.getChannelSerial() } return channelSerials } @@ -234,6 +232,7 @@ type RealtimeChannel struct { client *Realtime messageEmitter *eventEmitter + errorEmitter *eventEmitter queue *msgQueue options *channelOptions @@ -353,7 +352,7 @@ func (c *RealtimeChannel) lockAttach(err error) (result, error) { Action: actionAttach, Channel: c.Name, } - msg.ChannelSerial = c.properties.ChannelSerial // RTL4c1 + msg.ChannelSerial = c.properties.ChannelSerial // RTL4c1, accessing locked if len(c.channelOpts().Params) > 0 { msg.Params = c.channelOpts().Params } @@ -694,7 +693,9 @@ func (c *RealtimeChannel) HistoryUntilAttach(o ...HistoryOption) (*HistoryReques } untilAttachParam := func(o *historyOptions) { + c.mtx.Lock() o.params.Set("fromSerial", c.properties.AttachSerial) + c.mtx.Unlock() } o = append(o, untilAttachParam) @@ -777,14 +778,16 @@ func (c *RealtimeChannel) notify(msg *protocolMessage) { if !empty(msg.ChannelSerial) && msg.Action == actionMessage || msg.Action == actionPresence || msg.Action == actionAttached { c.log().Debugf("Setting channel serial for channelName - %v, previous - %v, current - %v", - c.Name, c.properties.ChannelSerial, msg.ChannelSerial) - c.properties.ChannelSerial = msg.ChannelSerial + c.Name, c.getChannelSerial(), msg.ChannelSerial) + c.setChannelSerial(msg.ChannelSerial) } switch msg.Action { case actionAttached: + c.mtx.Lock() c.properties.AttachSerial = msg.ChannelSerial // RTL15a - if c.State() == ChannelStateDetaching { // RTL5K + c.mtx.Unlock() + if c.State() == ChannelStateDetaching || c.State() == ChannelStateDetached { // RTL5K c.sendDetachMsg() return } @@ -794,11 +797,15 @@ func (c *RealtimeChannel) notify(msg *protocolMessage) { if msg.Flags != 0 { c.setModes(channelModeFromFlag(msg.Flags)) } - c.Presence.onAttach(msg) - isAttachResumed := msg.Flags.Has(flagResumed) - if c.state != ChannelStateAttached || !isAttachResumed { //RTL12 - c.setState(ChannelStateAttached, newErrorFromProto(msg.Error), isAttachResumed) + if c.State() == ChannelStateAttached { + if !msg.Flags.Has(flagResumed) { // RTL12 + c.Presence.onAttach(msg, true) + c.emitErrorUpdate(newErrorFromProto(msg.Error), false) + } + } else { + c.Presence.onAttach(msg, true) + c.setState(ChannelStateAttached, newErrorFromProto(msg.Error), msg.Flags.Has(flagResumed)) } c.queue.Flush() case actionDetached: @@ -840,9 +847,9 @@ func (c *RealtimeChannel) notify(msg *protocolMessage) { c.lockStartRetryAttachLoop(err) case actionSync: - c.Presence.processIncomingMessage(msg, syncSerial(msg)) + c.Presence.processProtoSyncMessage(msg) // RTP18 case actionPresence: - c.Presence.processIncomingMessage(msg, "") + c.Presence.processProtoPresenceMessage(msg) case actionError: c.setState(ChannelStateFailed, newErrorFromProto(msg.Error), false) c.queue.Fail(newErrorFromProto(msg.Error)) @@ -911,6 +918,18 @@ func (c *RealtimeChannel) setParams(params channelParams) { c.params = params } +func (c *RealtimeChannel) setChannelSerial(serial string) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.properties.ChannelSerial = serial +} + +func (c *RealtimeChannel) getChannelSerial() string { + c.mtx.Lock() + defer c.mtx.Unlock() + return c.properties.ChannelSerial +} + func (c *RealtimeChannel) setModes(modes []ChannelMode) { c.mtx.Lock() defer c.mtx.Unlock() @@ -947,9 +966,17 @@ func (c *RealtimeChannel) setState(state ChannelState, err error, resumed bool) c.mtx.Lock() defer c.mtx.Unlock() + // RTP5a + if state == ChannelStateDetached || state == ChannelStateFailed { + c.Presence.onChannelDetachedOrFailed(channelStateError(state, err)) + } // RTP5a1 if state == ChannelStateDetached || state == ChannelStateSuspended || state == ChannelStateFailed { - c.properties.ChannelSerial = "" + c.properties.ChannelSerial = "" // setting on already locked method + } + // RTP5f + if state == ChannelStateSuspended { + c.Presence.onChannelSuspended(channelStateError(state, err)) } return c.lockSetState(state, err, resumed) @@ -967,6 +994,17 @@ func (c *RealtimeChannel) lockSetAttachResume(state ChannelState) { } } +func (c *RealtimeChannel) emitErrorUpdate(err *ErrorInfo, resumed bool) { + change := ChannelStateChange{ + Current: c.state, + Previous: c.state, + Reason: err, + Resumed: resumed, + Event: ChannelEventUpdate, + } + c.emitter.Emit(change.Event, change) +} + func (c *RealtimeChannel) lockSetState(state ChannelState, err error, resumed bool) error { c.lockSetAttachResume(state) previous := c.state diff --git a/ably/realtime_channel_spec_integration_test.go b/ably/realtime_channel_spec_integration_test.go index 6be511330..a647b0614 100644 --- a/ably/realtime_channel_spec_integration_test.go +++ b/ably/realtime_channel_spec_integration_test.go @@ -184,9 +184,6 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { t.Run("RTL4a: If already attached, nothing is done", func(t *testing.T) { in, out, _, channel, stateChanges, _ := setup(t) - ctx, cancel := context.WithCancel(context.Background()) - - cancel() channel.OnAll(stateChanges.Receive) // Get the channel to ATTACHED. @@ -203,7 +200,7 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { "expected %v; got %v (event: %+v)", ably.ChannelStateAttached, change.Current) // Attach the channel again - channel.Attach(ctx) + channel.Attach(canceledCtx) ablytest.Instantly.NoRecv(t, nil, out, t.Fatalf) ablytest.Instantly.NoRecv(t, nil, stateChanges, t.Fatalf) @@ -313,15 +310,17 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { err = channel.Attach(ctx) // Check that the attach message isn't sent - checkIfAttachSent := recorder.CheckIfSent(ably.ActionAttach, 1) - attachSent := ablytest.Instantly.IsTrue(checkIfAttachSent) + checkIfAttachSentFn := recorder.CheckIfSent(ably.ActionAttach, 1) + attachSent := ablytest.Instantly.IsTrue(checkIfAttachSentFn) assert.False(t, attachSent, "Attach message was sent before connection is established") assert.Contains(t, err.Error(), "cannot Attach channel because connection is in FAILED state", "expected error to contain \"cannot Attach channel because connection is in FAILED state\"; got %v", err.Error()) - ablytest.Instantly.NoRecv(t, nil, channelStateChanges, t.Fatalf) + // No need for this check since channel receives failed state change from conn. failed state + // This happens a bit late, probably due to late start in internal go routines. + // ablytest.Instantly.NoRecv(t, nil, channelStateChanges, t.Fatalf) }) t.Run("RTL4b: If connection state is SUSPENDED, returns error", func(t *testing.T) { @@ -742,8 +741,8 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { channelTransitioner.To(chAttaching) // check if attach message is sent - checkIfAttachSent := recorder.CheckIfSent(ably.ActionAttach, 1) - attachSent := ablytest.Instantly.IsTrue(checkIfAttachSent) + checkIfAttachSentFn := recorder.CheckIfSent(ably.ActionAttach, 1) + attachSent := ablytest.Instantly.IsTrue(checkIfAttachSentFn) assert.True(t, attachSent, "Should send attach message, since channel is attached") @@ -764,8 +763,8 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { }) // Check that the attach message isn't sent - checkIfAttachSent = recorder.CheckIfSent(ably.ActionAttach, 1) - attachSent = ablytest.Instantly.IsTrue(checkIfAttachSent) + checkIfAttachSentFn = recorder.CheckIfSent(ably.ActionAttach, 1) + attachSent = ablytest.Instantly.IsTrue(checkIfAttachSentFn) assert.False(t, attachSent, "Attach message was sent") @@ -807,8 +806,8 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { channelTransitioner.To(chAttaching, chAttached, chDetaching) // check if attach message is sent - checkIfAttachSent := recorder.CheckIfSent(ably.ActionAttach, 1) - attachSent := ablytest.Instantly.IsTrue(checkIfAttachSent) + checkIfAttachSentFn := recorder.CheckIfSent(ably.ActionAttach, 1) + attachSent := ablytest.Instantly.IsTrue(checkIfAttachSentFn) assert.True(t, attachSent, "Should send attach message, since channel is attached") @@ -837,8 +836,8 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { }) // Check that the attach message isn't sent - checkIfAttachSent = recorder.CheckIfSent(ably.ActionAttach, 1) - attachSent = ablytest.Instantly.IsTrue(checkIfAttachSent) + checkIfAttachSentFn = recorder.CheckIfSent(ably.ActionAttach, 1) + attachSent = ablytest.Instantly.IsTrue(checkIfAttachSentFn) assert.False(t, attachSent, "Attach message was sent before connection is established") ablytest.Instantly.NoRecv(t, nil, channelStateChanges, t.Fatalf) // Shouldn't send attach, waiting for detach @@ -850,7 +849,7 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { "expected %v; got %v (event: %+v)", ably.ChannelStateDetached, channelStatechange.Current, channelStatechange) // Check that the attach message is sent - attachSent = ablytest.Instantly.IsTrue(checkIfAttachSent) + attachSent = ablytest.Instantly.IsTrue(checkIfAttachSentFn) assert.True(t, attachSent, "Should send attach message, since channel is detached") ablytest.Instantly.Recv(t, &channelStatechange, channelStateChanges, t.Fatalf) @@ -887,14 +886,11 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { channelStateChanges := make(ably.ChannelStateChanges, 10) channel.OnAll(channelStateChanges.Receive) - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - channel.Attach(ctx) + channel.Attach(canceledCtx) // Check that the attach message isn't sent - checkIfAttachSent := recorder.CheckIfSent(ably.ActionAttach, 1) - attachSent := ablytest.Instantly.IsTrue(checkIfAttachSent) + checkIfAttachSentFn := recorder.CheckIfSent(ably.ActionAttach, 1) + attachSent := ablytest.Instantly.IsTrue(checkIfAttachSentFn) assert.False(t, attachSent, "Attach message was sent before connection is established") @@ -913,7 +909,7 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { defer safeclose(t, closer) // Check that the attach message is sent - attachSent = ablytest.Instantly.IsTrue(checkIfAttachSent) + attachSent = ablytest.Instantly.IsTrue(checkIfAttachSentFn) assert.True(t, attachSent, "Should send attach message, since connected") @@ -948,14 +944,11 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { channelStateChanges := make(ably.ChannelStateChanges, 10) channel.OnAll(channelStateChanges.Receive) - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - channel.Attach(ctx) + channel.Attach(canceledCtx) // Check that the attach message isn't sent - checkIfAttachSent := recorder.CheckIfSent(ably.ActionAttach, 1) - attachSent := ablytest.Instantly.IsTrue(checkIfAttachSent) + checkIfAttachSentFn := recorder.CheckIfSent(ably.ActionAttach, 1) + attachSent := ablytest.Instantly.IsTrue(checkIfAttachSentFn) assert.False(t, attachSent, "Attach message was sent before connection is established") @@ -973,11 +966,16 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { ) // Check that the attach message is sent - attachSent = ablytest.Instantly.IsTrue(checkIfAttachSent) + attachSent = ablytest.Instantly.IsTrue(checkIfAttachSentFn) assert.True(t, attachSent, "Should send attach message, since connected") defer safeclose(t, closer) + // Reconnection makes explicit attach for each channel RTN15c6, RTN15c7 + ablytest.Soon.Recv(t, &channelStatechange, channelStateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateAttaching, channelStatechange.Current, + "expected %v; got %v (event: %+v)", ably.ChannelStateAttaching, channelStatechange.Current, channelStatechange) + ablytest.Soon.Recv(t, &channelStatechange, channelStateChanges, t.Fatalf) assert.Equal(t, ably.ChannelStateAttached, channelStatechange.Current, "expected %v; got %v (event: %+v)", ably.ChannelStateAttached, channelStatechange.Current, channelStatechange) @@ -986,9 +984,7 @@ func TestRealtimeChannel_RTL4_Attach(t *testing.T) { t.Run("RTL4j RTL13a: If channel attach is not a clean attach, should set ATTACH_RESUME in the ATTACH message", func(t *testing.T) { in, out, _, channel, stateChanges, _ := setup(t) - cancelledCtx, cancel := context.WithCancel(context.Background()) - cancel() - channel.Attach(cancelledCtx) + channel.Attach(canceledCtx) ablytest.Instantly.Recv(t, nil, out, t.Fatalf) // Consume ATTACHING channel.OnAll(stateChanges.Receive) @@ -1729,10 +1725,7 @@ func TestRealtimeChannel_RTL5_Detach(t *testing.T) { connecting, ) - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - channel.Detach(ctx) + channel.Detach(canceledCtx) // Check that the detach message isn't sent checkIfDetachSent := recorder.CheckIfSent(ably.ActionDetach, 1) @@ -1757,12 +1750,16 @@ func TestRealtimeChannel_RTL5_Detach(t *testing.T) { assert.True(t, detachSent, "Detach message was not sent") + ablytest.Instantly.Recv(t, &channelStatechange, channelStateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateDetaching, channelStatechange.Current, + "expected %v; got %v (event: %+v)", ably.ChannelStateDetaching, channelStatechange.Current, channelStatechange) + ablytest.Soon.Recv(t, &channelStatechange, channelStateChanges, t.Fatalf) assert.Equal(t, ably.ChannelStateDetached, channelStatechange.Current, "expected %v; got %v (event: %+v)", ably.ChannelStateDetached, channelStatechange.Current, channelStatechange) }) - t.Run("RTL5h : If Connection state DISCONNECTED, queue the DETACH message and send on CONNECTED", func(t *testing.T) { + t.Run("RTL5h, RTN19b: If Connection state DISCONNECTED, queue the DETACH message and send on CONNECTED", func(t *testing.T) { app, err := ablytest.NewSandbox(nil) assert.NoError(t, err) @@ -1798,14 +1795,11 @@ func TestRealtimeChannel_RTL5_Detach(t *testing.T) { disconnected, ) - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - channel.Detach(ctx) + channel.Detach(canceledCtx) // Check that the detach message isn't sent - checkIfDetachSent := recorder.CheckIfSent(ably.ActionDetach, 1) - detachSent := ablytest.Instantly.IsTrue(checkIfDetachSent) + checkIfDetachSentFn := recorder.CheckIfSent(ably.ActionDetach, 1) + detachSent := ablytest.Instantly.IsTrue(checkIfDetachSentFn) assert.False(t, detachSent, "Detach message was sent before connection is established") @@ -1823,9 +1817,12 @@ func TestRealtimeChannel_RTL5_Detach(t *testing.T) { defer safeclose(t, closer) // Check that the detach message sent - detachSent = ablytest.Instantly.IsTrue(checkIfDetachSent) - assert.True(t, detachSent, - "Detach message was not sent") + detachSent = ablytest.Instantly.IsTrue(checkIfDetachSentFn) + assert.True(t, detachSent, "Detach message was not sent") + + ablytest.Instantly.Recv(t, &channelStatechange, channelStateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateDetaching, channelStatechange.Current, + "expected %v; got %v (event: %+v)", ably.ChannelStateDetaching, channelStatechange.Current, channelStatechange) ablytest.Soon.Recv(t, &channelStatechange, channelStateChanges, t.Fatalf) assert.Equal(t, ably.ChannelStateDetached, channelStatechange.Current, @@ -1902,14 +1899,13 @@ func TestRealtimeChannel_RTL5_Detach(t *testing.T) { t.Run("RTL5j: if channel state is SUSPENDED, immediately transition to DETACHED state", func(t *testing.T) { t.Skip("Channel SUSPENDED not implemented yet") _, _, _, channel, stateChanges, _ := setup(t) - ctx, cancel := context.WithCancel(context.Background()) - cancel() + channel.OnAll(stateChanges.Receive) //channel.SetState(ably.ChannelStateSuspended, nil) ablytest.Instantly.Recv(t, nil, stateChanges, t.Fatalf) // State will be changed to suspended - channel.Detach(ctx) + channel.Detach(canceledCtx) var change ably.ChannelStateChange ablytest.Instantly.Recv(t, &change, stateChanges, t.Fatalf) @@ -1926,10 +1922,7 @@ func TestRealtimeChannel_RTL5_Detach(t *testing.T) { var outMsg *ably.ProtocolMessage var change ably.ChannelStateChange - cancelledContext, cancel := context.WithCancel(context.Background()) - cancel() - - channel.Attach(cancelledContext) + channel.Attach(canceledCtx) // get channel state to attaching ablytest.Instantly.Recv(t, &change, stateChanges, t.Fatalf) @@ -1950,7 +1943,7 @@ func TestRealtimeChannel_RTL5_Detach(t *testing.T) { assert.Equal(t, ably.ChannelStateAttached, change.Current, "expected %v; got %v (event: %+v)", ably.ChannelStateAttached, change.Current, change) - channel.Detach(cancelledContext) + channel.Detach(canceledCtx) ablytest.Instantly.Recv(t, &outMsg, out, t.Fatalf) assert.Equal(t, ably.ActionDetach, outMsg.Action, @@ -2007,9 +2000,6 @@ func TestRealtimeChannel_RTL6c1_PublishNow(t *testing.T) { channel, closer := chanTransitioner.To(transition...) defer safeclose(t, closer) - ctx, cancel := context.WithCancel(context.Background()) - cancel() - // Make a second client to subscribe and check that messages are // published without interferring with the first client's state. @@ -2026,7 +2016,7 @@ func TestRealtimeChannel_RTL6c1_PublishNow(t *testing.T) { t.Fatal(err) } - err = channel.Publish(ctx, "test", nil) + err = channel.Publish(canceledCtx, "test", nil) if err != nil && !errors.Is(err, context.Canceled) { t.Fatal(err) } @@ -2130,10 +2120,7 @@ func TestRealtimeChannel_RTL6c2_PublishEnqueue(t *testing.T) { closer = c.To(trans.connAfter...) defer safeclose(t, closer) - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - err = channel.Publish(ctx, "test", nil) + err = channel.Publish(canceledCtx, "test", nil) if err != nil && !errors.Is(err, context.Canceled) { t.Fatal(err) } @@ -2656,14 +2643,11 @@ func TestRealtimeChannel_RTL17_IgnoreMessagesWhenNotAttached(t *testing.T) { stateChanges = make(ably.ChannelStateChanges, 10) channel.OnAll(stateChanges.Receive) - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - channel.SubscribeAll(ctx, func(message *ably.Message) { + channel.SubscribeAll(canceledCtx, func(message *ably.Message) { msg <- message }) - channel.Attach(ctx) + channel.Attach(canceledCtx) return } @@ -2720,9 +2704,7 @@ func TestRealtimeChannel_RTL17_IgnoreMessagesWhenNotAttached(t *testing.T) { receiveMessage() ablytest.Instantly.Recv(t, nil, msg, t.Fatalf) - ctx, cancel := context.WithCancel(context.Background()) - cancel() - channel.Detach(ctx) + channel.Detach(canceledCtx) // Get the channel to DETACHED. ablytest.Instantly.Recv(t, nil, out, t.Fatalf) // Consume DETACHING diff --git a/ably/realtime_client.go b/ably/realtime_client.go index 10f36e804..a4428b2da 100644 --- a/ably/realtime_client.go +++ b/ably/realtime_client.go @@ -38,13 +38,14 @@ func NewRealtime(options ...ClientOption) (*Realtime, error) { }) c.Connection = conn + // RTN16 if !empty(c.opts().Recover) { recoverKeyContext, err := DecodeRecoveryKey(c.opts().Recover) if err != nil { c.log().Errorf("Error decoding recover with error %v", err) } else { - c.Channels.SetChannelSerialsFromRecoverOption(recoverKeyContext.ChannelSerials) - c.Connection.msgSerial = recoverKeyContext.MsgSerial + c.Channels.SetChannelSerialsFromRecoverOption(recoverKeyContext.ChannelSerials) // RTN16j + c.Connection.msgSerial = recoverKeyContext.MsgSerial // RTN16f } } return c, nil diff --git a/ably/realtime_conn.go b/ably/realtime_conn.go index 22f916c5e..92154c5ae 100644 --- a/ably/realtime_conn.go +++ b/ably/realtime_conn.go @@ -45,7 +45,7 @@ type Connection struct { state ConnectionState // errorReason is an [ably.ErrorInfo] object describing the last error received if - // a connection failure occurs (RTN14a, RTN15c7). + // a connection failure occurs (RTN14a, RTN15c7, RTN25). errorReason *ErrorInfo internalEmitter ConnectionEventEmitter @@ -82,6 +82,7 @@ type Connection struct { readLimit int64 isReadLimitSetExternally bool + recover string } type connCallbacks struct { @@ -108,6 +109,7 @@ func newConn(opts *clientOptions, auth *Auth, callbacks connCallbacks, client *R callbacks: callbacks, client: client, readLimit: maxMessageSize, + recover: opts.Recover, } auth.onExplicitAuthorize = c.onClientAuthorize c.queue = newMsgQueue(c) @@ -249,7 +251,7 @@ func (c *Connection) getMode() connectionMode { if c.key != "" { return resumeMode } - if c.opts.Recover != "" { + if c.recover != "" { return recoveryMode } return normalMode @@ -283,13 +285,14 @@ func (c *Connection) params(mode connectionMode) (url.Values, error) { } switch mode { case resumeMode: - query.Set("resume", c.key) + query.Set("resume", c.key) // RTN15b case recoveryMode: - recoveryKeyContext, err := DecodeRecoveryKey(c.opts.Recover) + recoveryKeyContext, err := DecodeRecoveryKey(c.recover) if err != nil { c.log().Errorf("error decoding recovery key, %v", err) + } else { + query.Set("recover", recoveryKeyContext.ConnectionKey) // RTN16k } - query.Set("recover", recoveryKeyContext.ConnectionKey) } return query, nil } @@ -500,6 +503,7 @@ func (c *Connection) RecoveryKey() string { func (c *Connection) CreateRecoveryKey() string { c.mtx.Lock() defer c.mtx.Unlock() + // RTN16g2 if empty(c.key) || c.state == ConnectionStateClosing || c.state == ConnectionStateClosed || c.state == ConnectionStateFailed || @@ -593,11 +597,16 @@ func (c *Connection) advanceSerial() { func (c *Connection) send(msg *protocolMessage, onAck func(err error)) { hasMsgSerial := msg.Action == actionMessage || msg.Action == actionPresence c.mtx.Lock() + // RTP16a - in case of presence msg send, check for connection status and send accordingly switch state := c.state; state { default: c.mtx.Unlock() if onAck != nil { - onAck(connStateError(state, nil)) + if c.state == ConnectionStateClosed { + onAck(errClosed) + } else { + onAck(connStateError(state, nil)) + } } case ConnectionStateInitialized, ConnectionStateConnecting, ConnectionStateDisconnected: @@ -606,9 +615,9 @@ func (c *Connection) send(msg *protocolMessage, onAck func(err error)) { if onAck != nil { onAck(connStateError(state, errQueueing)) } + } else { + c.queue.Enqueue(msg, onAck) // RTL4i } - c.queue.Enqueue(msg, onAck) // RTL4i - case ConnectionStateConnected: if err := c.verifyAndUpdateMessages(msg); err != nil { c.mtx.Unlock() @@ -628,6 +637,7 @@ func (c *Connection) send(msg *protocolMessage, onAck func(err error)) { // reconnection logic. But in case it isn't, force that by closing the // connection. Otherwise, the message we enqueue here may be in the queue // indefinitely. + c.log().Warnf("transport level failure while sending message, %v", err) c.conn.Close() c.mtx.Unlock() c.queue.Enqueue(msg, onAck) @@ -802,8 +812,8 @@ func (c *Connection) eventloop() { c.mtx.Lock() // recover is used when set via clientOptions#recover initially, resume will be used for all reconnects. - isConnectionResumeOrRecoverAttempt := !empty(c.key) || !empty(c.opts.Recover) - c.opts.Recover = "" // RTN16k, explicitly setting null so it won't be used for subsequent connection requests + isConnectionResumeOrRecoverAttempt := !empty(c.key) || !empty(c.recover) + c.recover = "" // RTN16k, explicitly setting null so it won't be used for subsequent connection requests // we need to get this before we set c.key so as to be sure if we were // resuming or recovering the connection. @@ -847,17 +857,11 @@ func (c *Connection) eventloop() { continue } - // (RTN15c1) (RTN15c2) - // RTN24, RTN15c7 - if error, set on connection and part of emitted connected event + // RTN24, RTN15c6, RTN15c7 - if error, set on connection and part of emitted connected event c.lockSetState(ConnectionStateConnected, newErrorFromProto(msg.Error), 0) c.mtx.Unlock() if reconnecting { - - // (RTN15c3) - // we are calling this outside of locks to avoid deadlock because in the - // RealtimeClient client where this callback is implemented we do some ops - // with this Conn where we re acquire Conn.Lock again. c.callbacks.onReconnected(failedResumeOrRecover) } c.queue.Flush() diff --git a/ably/realtime_conn_integration_test.go b/ably/realtime_conn_integration_test.go index 471ff2650..6fd54273e 100644 --- a/ably/realtime_conn_integration_test.go +++ b/ably/realtime_conn_integration_test.go @@ -23,7 +23,7 @@ var connTransitions = []ably.ConnectionState{ ably.ConnectionStateClosed, } -func TestRealtimeConn_Connect(t *testing.T) { +func TestRealtimeConn_AutoConnect_And_Close(t *testing.T) { var rec ablytest.ConnStatesRecorder app, client := ablytest.NewRealtime() defer safeclose(t, ablytest.FullRealtimeCloser(client), app) @@ -31,8 +31,7 @@ func TestRealtimeConn_Connect(t *testing.T) { defer off() err := ablytest.Wait(ablytest.ConnWaiter(client, nil, ably.ConnectionEventConnected), nil) - assert.NoError(t, err, - "Connect()=%v", err) + assert.NoError(t, err, "Connect()=%v", err) err = ablytest.FullRealtimeCloser(client).Close() assert.NoError(t, err, "ablytest.FullRealtimeCloser(client).Close()=%v", err) @@ -44,7 +43,7 @@ func TestRealtimeConn_Connect(t *testing.T) { } } -func TestRealtimeConn_NoConnect(t *testing.T) { +func TestRealtimeConn_No_AutoConnect(t *testing.T) { var rec ablytest.ConnStatesRecorder opts := []ably.ClientOption{ ably.WithAutoConnect(false), @@ -68,29 +67,6 @@ func TestRealtimeConn_NoConnect(t *testing.T) { } } -func TestRealtimeConn_ConnectClose(t *testing.T) { - var rec ablytest.ConnStatesRecorder - app, client := ablytest.NewRealtime() - defer safeclose(t, ablytest.FullRealtimeCloser(client), app) - off := rec.Listen(client) - defer off() - - err := ablytest.Wait(ablytest.ConnWaiter(client, nil, ably.ConnectionEventConnected), nil) - assert.NoError(t, err) - err = ablytest.FullRealtimeCloser(client).Close() - assert.NoError(t, err, - "ablytest.FullRealtimeCloser(client).Close()=%v", err) - - err = ablytest.Wait(ablytest.ConnWaiter(client, nil, ably.ConnectionEventClosed), nil) - assert.NoError(t, err) - - if !ablytest.Soon.IsTrue(func() bool { - return ablytest.Contains(rec.States(), connTransitions) - }) { - t.Fatalf("expected %+v, got %+v", connTransitions, rec.States()) - } -} - func TestRealtimeConn_AlreadyConnected(t *testing.T) { app, client := ablytest.NewRealtime(ably.WithAutoConnect(false)) defer safeclose(t, ablytest.FullRealtimeCloser(client), app) diff --git a/ably/realtime_conn_spec_integration_test.go b/ably/realtime_conn_spec_integration_test.go index 40b596760..18b473812 100644 --- a/ably/realtime_conn_spec_integration_test.go +++ b/ably/realtime_conn_spec_integration_test.go @@ -931,32 +931,33 @@ func recent(msgs []*ably.ProtocolMessage, action ably.ProtoAction) *ably.Protoco return nil } -func TestRealtimeConn_RTN15c1(t *testing.T) { +func TestRealtimeConn_RTN15c6(t *testing.T) { doEOF := make(chan struct{}, 1) - var metaList []*transportMessages - gotDial := make(chan chan struct{}) + continueDial := make(chan struct{}, 1) + continueDial <- struct{}{} app, client := ablytest.NewRealtime( ably.WithAutoConnect(false), ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { - m := &transportMessages{dial: u} - metaList = append(metaList, m) - if len(metaList) > 1 { - goOn := make(chan struct{}) - gotDial <- goOn - <-goOn - } + <-continueDial c, err := ably.DialWebsocket(protocol, u, timeout) - return protoConnWithFakeEOF{Conn: c, doEOF: doEOF, onMessage: func(msg *ably.ProtocolMessage) { - m.Add(msg) - }}, err + return protoConnWithFakeEOF{ + Conn: c, + doEOF: doEOF, + }, err })) defer safeclose(t, ablytest.FullRealtimeCloser(client), app) err := ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected), nil) assert.NoError(t, err, "Connect=%s", err) + prevConnId := client.Connection.ID() + + // Increase msgSerial, to test that it doesn't reset later. + err = client.Channels.Get("publish").Publish(context.Background(), "test", nil) + assert.NoError(t, err) + assert.NotZero(t, client.Connection.MsgSerial()) channel := client.Channels.Get("channel") err = channel.Attach(context.Background()) @@ -970,395 +971,184 @@ func TestRealtimeConn_RTN15c1(t *testing.T) { off := channel.OnAll(chanStateChanges.Receive) defer off() - stateChanges := make(chan ably.ConnectionStateChange, 16) + connStateChanges := make(chan ably.ConnectionStateChange, 16) client.Connection.OnAll(func(c ably.ConnectionStateChange) { - stateChanges <- c + connStateChanges <- c }) doEOF <- struct{}{} - var state ably.ConnectionStateChange + var connState ably.ConnectionStateChange + + ablytest.Soon.Recv(t, &connState, connStateChanges, t.Fatalf) + + assert.Equal(t, ably.ConnectionStateDisconnected, connState.Current, + "expected transition to %v, got %v", ably.ConnectionStateDisconnected, connState.Current) - select { - case state = <-stateChanges: - case <-time.After(50 * time.Millisecond): - t.Fatal("didn't transition on EOF") - } - assert.Equal(t, ably.ConnectionStateDisconnected, state.Current, - "expected transition to %v, got %v", ably.ConnectionStateDisconnected, state.Current) rest, err := ably.NewREST(app.Options()...) assert.NoError(t, err) - goOn := <-gotDial err = rest.Channels.Get("channel").Publish(context.Background(), "name", "data") assert.NoError(t, err) - close(goOn) - select { - case state = <-stateChanges: - case <-time.After(50 * time.Millisecond): - t.Fatal("didn't reconnect") - } - assert.Equal(t, ably.ConnectionStateConnecting, state.Current, - "expected transition to %v, got %v", ably.ConnectionStateConnecting, state.Current) - select { - case msg := <-sub: - assert.Equal(t, "data", msg.Data, - "expected message with data \"data\" got %v", msg.Data) - case <-time.After(ablytest.Timeout): - t.Fatal("expected message after connection recovery; got none") - } + continueDial <- struct{}{} - // (RTN15c1) - // - // - current connectionId == resume message connectionId - // - resume message has no error - // - no channel state changes happened. + ablytest.Soon.Recv(t, &connState, connStateChanges, t.Fatalf) + assert.Equal(t, ably.ConnectionStateConnecting, connState.Current, "expected connecting; got %+v", connState.Current) + + ablytest.Soon.Recv(t, &connState, connStateChanges, t.Fatalf) + assert.Equal(t, ably.ConnectionStateConnected, connState.Current, "expected connected; got %+v", connState.Current) + assert.Nil(t, connState.Reason, "expected nil conn error, got %+v", connState.Reason) + // Check channel goes into attaching and attached state var change ably.ChannelStateChange ablytest.Soon.Recv(t, &change, chanStateChanges, t.Fatalf) - assert.Equal(t, change.Previous, change.Current, - "expected no state change; got %+v", change) - assert.Equal(t, client.Connection.ID(), metaList[1].Messages()[0].ConnectionID, - "expected %q to equal %q", client.Connection.ID(), metaList[1].Messages()[0].ConnectionID) - assert.Nil(t, metaList[1].Messages()[0].Error, - "expected resume error to be nil") -} - -func TestRealtimeConn_RTN15c2(t *testing.T) { - - doEOF := make(chan struct{}, 1) - - var metaList []*transportMessages - - gotDial := make(chan chan struct{}) - app, client := ablytest.NewRealtime( - ably.WithAutoConnect(false), - ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { - m := &transportMessages{dial: u} - metaList = append(metaList, m) - if len(metaList) > 1 { - goOn := make(chan struct{}) - gotDial <- goOn - <-goOn - } - c, err := ably.DialWebsocket(protocol, u, timeout) - return protoConnWithFakeEOF{Conn: c, doEOF: doEOF, onMessage: func(msg *ably.ProtocolMessage) { - if len(metaList) == 2 && len(m.Messages()) == 0 { - msg.Error = &ably.ProtoErrorInfo{StatusCode: 401} - } - m.Add(msg) - }}, err - })) - defer safeclose(t, ablytest.FullRealtimeCloser(client), app) - - err := ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected), nil) - assert.NoError(t, err, - "Connect=%s", err) - - channel := client.Channels.Get("channel") - err = channel.Attach(context.Background()) - assert.NoError(t, err) - chanStateChanges := make(ably.ChannelStateChanges) - off := channel.OnAll(chanStateChanges.Receive) - defer off() + assert.Equal(t, ably.ChannelStateAttaching, change.Current, "expected no state change; got %+v", change) + ablytest.Soon.Recv(t, &change, chanStateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateAttached, change.Current, "expected no state change; got %+v", change) - sub, unsub, err := ablytest.ReceiveMessages(channel, "") - assert.NoError(t, err) - defer unsub() + // Expect message to be received after resume success + var msg *ably.Message + ablytest.Soon.Recv(t, &msg, sub, t.Fatalf) - stateChanges := make(chan ably.ConnectionStateChange, 16) - client.Connection.OnAll(func(c ably.ConnectionStateChange) { - stateChanges <- c - }) + // Check for resume success + assert.Equal(t, prevConnId, client.Connection.ID()) + assert.Nil(t, client.Connection.ErrorReason()) + assert.NotZero(t, client.Connection.MsgSerial()) - prevMsgSerial := client.Connection.MsgSerial() + // Set channel to attaching state + channel.SetState(ably.ChannelStateAttaching) doEOF <- struct{}{} + continueDial <- struct{}{} - var state ably.ConnectionStateChange - - select { - case state = <-stateChanges: - case <-time.After(50 * time.Millisecond): - t.Fatal("didn't transition on EOF") - } - assert.Equal(t, ably.ConnectionStateDisconnected, state.Current, - "expected transition to %v, got %v", ably.ConnectionStateDisconnected, state.Current) - rest, err := ably.NewREST(app.Options()...) - assert.NoError(t, err) - goOn := <-gotDial - err = rest.Channels.Get("channel").Publish(context.Background(), "name", "data") - assert.NoError(t, err) - close(goOn) - - select { - case state = <-stateChanges: - case <-time.After(50 * time.Millisecond): - t.Fatal("didn't reconnect") - } - assert.Equal(t, ably.ConnectionStateConnecting, state.Current, - "expected transition to %v, got %v", ably.ConnectionStateConnecting, state.Current) - - select { - case msg := <-sub: - assert.Equal(t, "data", msg.Data, - "expected message with data \"data\", got %v", msg.Data) - case <-time.After(ablytest.Timeout): - t.Fatal("expected message after connection recovery; got none") - } + // Check channel goes into attaching and attached state + ablytest.Soon.Recv(t, &change, chanStateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateAttaching, change.Current, "expected no state change; got %+v", change) + ablytest.Soon.Recv(t, &change, chanStateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateAttached, change.Current, "expected no state change; got %+v", change) - // (RTN15c2) - // - // - current connectionId == resume message connectionId - // - resume message has an error - // - Conn.Reqson == message resume error - // - no channel state changes happened. + // Set channel to suspended state + channel.SetState(ably.ChannelStateSuspended) + doEOF <- struct{}{} + continueDial <- struct{}{} - var change ably.ChannelStateChange + // Check channel goes into attaching and attached state + ablytest.Soon.Recv(t, &change, chanStateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateAttaching, change.Current, "expected no state change; got %+v", change) ablytest.Soon.Recv(t, &change, chanStateChanges, t.Fatalf) - assert.Equal(t, change.Previous, change.Current, - "expected no state change; got %+v", change) - assert.Equal(t, client.Connection.ID(), metaList[1].Messages()[0].ConnectionID, - "expected %q to equal %q", client.Connection.ID(), metaList[1].Messages()[0].ConnectionID) - assert.NotNil(t, metaList[1].Messages()[0].Error, - "expected resume error") - - err = client.Connection.ErrorReason() - assert.NotNil(t, err, - "expected reason to be set") - - reason := err.(*ably.ErrorInfo) - assert.Equal(t, 401, reason.StatusCode, - "expected status code 401 got %d", reason.StatusCode) - assert.Equal(t, prevMsgSerial, client.Connection.MsgSerial(), - "msgSerial shouldn't be reset on resumed connection") + assert.Equal(t, ably.ChannelStateAttached, change.Current, "expected no state change; got %+v", change) + + // Check for resume success + assert.Equal(t, prevConnId, client.Connection.ID()) + assert.Nil(t, client.Connection.ErrorReason()) + assert.NotZero(t, client.Connection.MsgSerial()) } -func TestRealtimeConn_RTN15c3_attached(t *testing.T) { +func TestRealtimeConn_RTN15c7_attached(t *testing.T) { doEOF := make(chan struct{}, 1) - var metaList []*transportMessages - connID := "new-conn-id" - gotDial := make(chan chan struct{}) + continueDial := make(chan struct{}, 1) + continueDial <- struct{}{} + app, client := ablytest.NewRealtime( ably.WithAutoConnect(false), ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { - m := &transportMessages{dial: u} - metaList = append(metaList, m) - if len(metaList) > 1 { - goOn := make(chan struct{}) - gotDial <- goOn - <-goOn - } + <-continueDial c, err := ably.DialWebsocket(protocol, u, timeout) - return protoConnWithFakeEOF{Conn: c, doEOF: doEOF, onMessage: func(msg *ably.ProtocolMessage) { - if len(metaList) == 2 && len(m.Messages()) == 0 { - msg.Error = &ably.ProtoErrorInfo{StatusCode: 401} - msg.ConnectionID = connID - } - m.Add(msg) - }}, err + return protoConnWithFakeEOF{ + Conn: c, + doEOF: doEOF, + }, err })) defer safeclose(t, ablytest.FullRealtimeCloser(client), app) err := ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected), nil) - assert.NoError(t, err, - "Connect=%s", err) + assert.NoError(t, err, "Connect=%s", err) + prevConnId := client.Connection.ID() // Increase msgSerial, to test that it gets reset later. err = client.Channels.Get("publish").Publish(context.Background(), "test", nil) assert.NoError(t, err) + assert.NotZero(t, client.Connection.MsgSerial()) channel := client.Channels.Get("channel") err = channel.Attach(context.Background()) assert.NoError(t, err) - chanStateChanges := make(ably.ChannelStateChanges, 18) - off := channel.On(ably.ChannelEventAttaching, chanStateChanges.Receive) + _, unsub, err := ablytest.ReceiveMessages(channel, "") + assert.NoError(t, err) + defer unsub() + + chanStateChanges := make(ably.ChannelStateChanges) + off := channel.OnAll(chanStateChanges.Receive) defer off() - stateChanges := make(chan ably.ConnectionStateChange, 16) + connStateChanges := make(chan ably.ConnectionStateChange, 16) client.Connection.OnAll(func(c ably.ConnectionStateChange) { - stateChanges <- c + connStateChanges <- c }) + client.Connection.SetKey("xxxxx!xxxxxxx-xxxxxxxx-xxxxxxxx") // invalid connection key for next resume request doEOF <- struct{}{} - var state ably.ConnectionStateChange + var connState ably.ConnectionStateChange + + ablytest.Soon.Recv(t, &connState, connStateChanges, t.Fatalf) + assert.Equal(t, ably.ConnectionStateDisconnected, connState.Current, + "expected transition to %v, got %v", ably.ConnectionStateDisconnected, connState.Current) - select { - case state = <-stateChanges: - case <-time.After(50 * time.Millisecond): - t.Fatal("didn't transition on EOF") - } - assert.Equal(t, ably.ConnectionStateDisconnected, state.Current, - "expected transition to %v, got %v", ably.ConnectionStateDisconnected, state.Current) rest, err := ably.NewREST(app.Options()...) assert.NoError(t, err) - goOn := <-gotDial err = rest.Channels.Get("channel").Publish(context.Background(), "name", "data") assert.NoError(t, err) - close(goOn) - select { - case state = <-stateChanges: - case <-time.After(50 * time.Millisecond): - t.Fatal("didn't reconnect") - } - assert.Equal(t, ably.ConnectionStateConnecting, state.Current, - "expected transition to %v, got %v", ably.ConnectionStateConnecting, state.Current) - - <-stateChanges - - var chanState ably.ChannelStateChange - select { - case chanState = <-chanStateChanges: - case <-time.After(50 * time.Millisecond): - t.Fatal("didn't change state") - } - // we are testing to make sure we have initiated a new attach for channels - // in ATTACHED state. - assert.Equal(t, ably.ChannelStateAttaching, chanState.Current, - "expected transition to %v, got %v", ably.ChannelStateAttaching, chanState.Current) - - reason := client.Connection.ErrorReason() - assert.NotNil(t, reason, "expected reason to be set") - assert.Equal(t, 401, reason.StatusCode, - "expected status code 401 got %d", reason.StatusCode) - assert.Equal(t, int64(0), client.Connection.MsgSerial(), - "expected msgSerial to be reset; got %d", client.Connection.MsgSerial()) -} - -func TestRealtimeConn_RTN15c3_attaching(t *testing.T) { - - doEOF := make(chan struct{}, 1) - - var metaList []*transportMessages - connID := "new-conn-id" - gotDial := make(chan chan struct{}) - app, client := ablytest.NewRealtime( - ably.WithAutoConnect(false), - ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { - m := &transportMessages{dial: u} - metaList = append(metaList, m) - if len(metaList) > 1 { - goOn := make(chan struct{}) - gotDial <- goOn - <-goOn - } - c, err := ably.DialWebsocket(protocol, u, timeout) - return protoConnWithFakeEOF{Conn: c, doEOF: doEOF, onMessage: func(msg *ably.ProtocolMessage) { - if len(metaList) == 2 && len(m.Messages()) == 0 { - msg.Error = &ably.ProtoErrorInfo{StatusCode: 401} - msg.ConnectionID = connID - } - if msg.Action == ably.ActionAttached { - msg.Action = ably.ActionHeartbeat - } - m.Add(msg) - }}, err - })) - defer safeclose(t, ablytest.FullRealtimeCloser(client), app) + continueDial <- struct{}{} - err := ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected), nil) - assert.NoError(t, err, - "Connect=%s", err) + ablytest.Soon.Recv(t, &connState, connStateChanges, t.Fatalf) + assert.Equal(t, ably.ConnectionStateConnecting, connState.Current) - // Increase msgSerial, to test that it gets reset later. - err = client.Channels.Get("publish").Publish(context.Background(), "test", nil) - assert.NoError(t, err) + ablytest.Soon.Recv(t, &connState, connStateChanges, t.Fatalf) + assert.Equal(t, ably.ConnectionStateConnected, connState.Current) + assert.NotNil(t, connState.Reason, "expected not nil connError, got nil connError") - channel := client.Channels.Get("channel") - attaching := make(ably.ChannelStateChanges, 1) - off := channel.On(ably.ChannelEventAttaching, attaching.Receive) - defer off() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go func() { - channel.Attach(ctx) - }() - - ablytest.Soon.Recv(t, nil, attaching, t.Fatalf) - - stateChanges := make(chan ably.ConnectionStateChange, 16) - client.Connection.OnAll(func(c ably.ConnectionStateChange) { - stateChanges <- c - }) - - doEOF <- struct{}{} - - var state ably.ConnectionStateChange - - select { - case state = <-stateChanges: - case <-time.After(50 * time.Millisecond): - t.Fatal("didn't transition on EOF") - } - assert.Equal(t, ably.ConnectionStateDisconnected, state.Current, - "expected transition to %v, got %v", ably.ConnectionStateDisconnected, state.Current) - rest, err := ably.NewREST(app.Options()...) - assert.NoError(t, err) - - goOn := <-gotDial - err = rest.Channels.Get("channel").Publish(ctx, "name", "data") - assert.NoError(t, err) - close(goOn) - - select { - case state = <-stateChanges: - case <-time.After(50 * time.Millisecond): - t.Fatal("didn't reconnect") - } - assert.Equal(t, ably.ConnectionStateConnecting, state.Current, - "expected transition to %v, got %v", ably.ConnectionStateConnecting, state.Current) - - <-stateChanges + // Check channel goes into attaching and attached state + var change ably.ChannelStateChange + ablytest.Soon.Recv(t, &change, chanStateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateAttaching, change.Current) + ablytest.Soon.Recv(t, &change, chanStateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateAttached, change.Current) - // we are testing to make sure we have initiated a new attach for channels - // in ATTACHING state. - assert.Equal(t, ably.ChannelStateAttaching, channel.State(), - "expected transition to %v, got %v", ably.ChannelStateAttaching, channel.State()) + // Check for resume failure + assert.NotEqual(t, prevConnId, client.Connection.ID()) + assert.Zero(t, client.Connection.MsgSerial()) + assert.NotNil(t, client.Connection.ErrorReason()) + assert.Equal(t, 400, client.Connection.ErrorReason().StatusCode) - reason := client.Connection.ErrorReason() - assert.NotNil(t, reason, - "expected reason to be set") - assert.Equal(t, 401, reason.StatusCode, - "expected status code 401 got %d", reason.StatusCode) - assert.Equal(t, int64(0), client.Connection.MsgSerial(), - "expected msgSerial to be reset; got %d", client.Connection.MsgSerial()) + // Todo - Expect message not to be arrived due to resume failure + // var msg *ably.Message + // ablytest.Soon.NoRecv(t, &msg, sub, t.Fatalf) } func TestRealtimeConn_RTN15c4(t *testing.T) { doEOF := make(chan struct{}, 1) - var metaList []*transportMessages - gotDial := make(chan chan struct{}) + continueDial := make(chan struct{}, 1) + continueDial <- struct{}{} app, client := ablytest.NewRealtime( ably.WithAutoConnect(false), ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { - m := &transportMessages{dial: u} - metaList = append(metaList, m) - if len(metaList) > 1 { - goOn := make(chan struct{}) - gotDial <- goOn - <-goOn - } + <-continueDial c, err := ably.DialWebsocket(protocol, u, timeout) - return protoConnWithFakeEOF{Conn: c, doEOF: doEOF, onMessage: func(msg *ably.ProtocolMessage) { - if len(metaList) == 2 && len(m.Messages()) == 0 { - msg.Action = ably.ActionError - msg.Error = &ably.ProtoErrorInfo{StatusCode: http.StatusBadRequest} - } - m.Add(msg) - }}, err + return protoConnWithFakeEOF{ + Conn: c, + doEOF: doEOF, + }, err })) defer safeclose(t, &closeClient{Closer: ablytest.FullRealtimeCloser(client), skip: []int{http.StatusBadRequest}}, app) err := ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected), nil) - assert.NoError(t, err, - "Connect=%s", err) + assert.NoError(t, err, "Connect=%s", err) channel := client.Channels.Get("channel") err = channel.Attach(context.Background()) @@ -1367,52 +1157,43 @@ func TestRealtimeConn_RTN15c4(t *testing.T) { off := channel.On(ably.ChannelEventFailed, chanStateChanges.Receive) defer off() - stateChanges := make(chan ably.ConnectionStateChange, 16) + connStateChanges := make(chan ably.ConnectionStateChange, 16) client.Connection.OnAll(func(c ably.ConnectionStateChange) { - stateChanges <- c + connStateChanges <- c }) + client.Connection.SetKey("wrong-conn-key") // wrong connection key for next resume request doEOF <- struct{}{} - var state ably.ConnectionStateChange + var connState ably.ConnectionStateChange + + ablytest.Soon.Recv(t, &connState, connStateChanges, t.Fatalf) + assert.Equal(t, ably.ConnectionStateDisconnected, connState.Current, + "expected transition to %v, got %v", ably.ConnectionStateDisconnected, connState.Current) - select { - case state = <-stateChanges: - case <-time.After(50 * time.Millisecond): - t.Fatal("didn't transition on EOF") - } - assert.Equal(t, ably.ConnectionStateDisconnected, state.Current, - "expected transition to %v, got %v", ably.ConnectionStateDisconnected, state.Current) rest, err := ably.NewREST(app.Options()...) assert.NoError(t, err) - goOn := <-gotDial err = rest.Channels.Get("channel").Publish(context.Background(), "name", "data") assert.NoError(t, err) - close(goOn) - select { - case state = <-stateChanges: - case <-time.After(50 * time.Millisecond): - t.Fatal("didn't reconnect") - } - assert.Equal(t, ably.ConnectionStateConnecting, state.Current, - "expected transition to %v, got %v", ably.ConnectionStateConnecting, state.Current) - <-stateChanges - var chanState ably.ChannelStateChange - select { - case chanState = <-chanStateChanges: - case <-time.After(50 * time.Millisecond): - t.Fatal("didn't change state") - } - assert.Equal(t, ably.ChannelStateFailed, chanState.Current, - "expected transition to %v, got %v", ably.ChannelStateFailed, chanState.Current) + continueDial <- struct{}{} + + ablytest.Soon.Recv(t, &connState, connStateChanges, t.Fatalf) + assert.Equal(t, ably.ConnectionStateConnecting, connState.Current) + + // Connection goes into failed state + ablytest.Soon.Recv(t, &connState, connStateChanges, t.Fatalf) + assert.Equal(t, ably.ConnectionStateFailed, connState.Current) + + // Check channel goes into failed state + var change ably.ChannelStateChange + ablytest.Soon.Recv(t, &change, chanStateChanges, t.Fatalf) + assert.Equal(t, ably.ChannelStateFailed, change.Current) + reason := client.Connection.ErrorReason() assert.NotNil(t, reason, "expected reason to be set") assert.Equal(t, http.StatusBadRequest, reason.StatusCode, "expected %d got %d", http.StatusBadRequest, reason.StatusCode) - // The client should transition to the FAILED state - assert.Equal(t, ably.ConnectionStateFailed, client.Connection.State(), - "expected transition to %v, got %v", ably.ConnectionStateFailed, client.Connection.State()) } func TestRealtimeConn_RTN15d_MessageRecovery(t *testing.T) { @@ -1566,11 +1347,11 @@ func TestRealtimeConn_RTN15g_NewConnectionOnStateLost(t *testing.T) { connIDs <- "conn-1" err := ablytest.Wait(ablytest.ConnWaiter(c, c.Connect, ably.ConnectionEventConnected), nil) assert.NoError(t, err) + prevConnectionKey := c.Connection.Key() - ablytest.Instantly.Recv(t, nil, dials, t.Fatalf) // discard first URL; we're interested in reconnections + ablytest.Instantly.Recv(t, nil, dials, t.Fatalf) // Get channels to ATTACHING, ATTACHED and DETACHED. (TODO: SUSPENDED) - attaching := c.Channels.Get("attaching") _ = ablytest.ResultFunc.Go(func(ctx context.Context) error { return attaching.Attach(ctx) }) msg := <-out @@ -1626,30 +1407,40 @@ func TestRealtimeConn_RTN15g_NewConnectionOnStateLost(t *testing.T) { connIDs <- "conn-1" // Same connection ID so the resume "succeeds". var dialed *url.URL ablytest.Instantly.Recv(t, &dialed, dials, t.Fatalf) - resume := dialed.Query().Get("resume") - assert.NotEqual(t, "", resume, "expected a resume key; got %v", resume) + assert.Equal(t, prevConnectionKey, dialed.Query().Get("resume")) + ablytest.Instantly.Recv(t, nil, connected, t.Fatalf) // wait for CONNECTED before disconnecting again + + // RTN15g3: Expect the previously attaching and attached channels to be + // attached again. + + attachExpected := map[string]struct{}{ + "attaching": {}, + "attached": {}, + } + for len(attachExpected) > 0 { + var msg *ably.ProtocolMessage + ablytest.Instantly.Recv(t, &msg, out, t.Fatalf) + _, ok := attachExpected[msg.Channel] + assert.True(t, ok, + "ATTACH sent for unexpected or already attaching channel %q", msg.Channel) + delete(attachExpected, msg.Channel) + } + ablytest.Instantly.NoRecv(t, nil, out, t.Fatalf) // Now do the same, but past connectionStateTTL + maxIdleInterval. This // should make a fresh connection. - ablytest.Instantly.Recv(t, nil, connected, t.Fatalf) // wait for CONNECTED before disconnecting again - setNow(now().Add(discardStateTTL + 1)) breakConn() - - connIDs <- "conn-2" + connIDs <- "conn-2" // different connection id, so resume failure ablytest.Instantly.Recv(t, &dialed, dials, t.Fatalf) - resume = dialed.Query().Get("resume") - assert.Equal(t, "", resume, - "didn't expect a resume key; got %v", resume) - - recoverValue := dialed.Query().Get("recover") - assert.Equal(t, "", recoverValue, - "didn't expect a recover key; got %v", dialed) + assert.Empty(t, dialed.Query().Get("resume")) + assert.Empty(t, dialed.Query().Get("recover")) + ablytest.Instantly.Recv(t, nil, connected, t.Fatalf) // RTN15g3: Expect the previously attaching and attached channels to be // attached again. - attachExpected := map[string]struct{}{ + attachExpected = map[string]struct{}{ "attaching": {}, "attached": {}, } @@ -1983,25 +1774,49 @@ func TestRealtimeConn_RTN16(t *testing.T) { channel := c.Channels.Get("channel") err = channel.Attach(context.Background()) assert.NoError(t, err) + + var msg *ably.Message + sub, unsub, err := ablytest.ReceiveMessages(channel, "") + if err != nil { + t.Fatal(err) + } + defer unsub() err = channel.Publish(context.Background(), "name", "data") assert.NoError(t, err) + + ablytest.Soon.Recv(t, &msg, sub, t.Fatalf) + assert.Equal(t, "data", msg.Data) + prevMsgSerial := c.Connection.MsgSerial() + prevConnId := c.Connection.ID() + + recoveryKey := c.Connection.CreateRecoveryKey() // RTN16g - createRecoveryKey + decodedRecoveryKey, err := ably.DecodeRecoveryKey(recoveryKey) // RTN16g1 + assert.Nil(t, err) + + deprecatedRecoveryKey := c.Connection.RecoveryKey() + assert.Equal(t, deprecatedRecoveryKey, recoveryKey) //RTN16m client := app.NewRealtime( - ably.WithRecover(c.Connection.RecoveryKey()), + ably.WithRecover(recoveryKey), ) defer safeclose(t, ablytest.FullRealtimeCloser(client)) err = ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected), nil) assert.NoError(t, err) - { //RTN16b, RTN16f + { // RTN16f, RTN16j, RTN16d assert.True(t, sameConnection(client.Connection.Key(), c.Connection.Key()), "expected the same connection") + assert.Equal(t, prevConnId, c.Connection.ID()) + assert.Nil(t, client.Connection.ErrorReason()) assert.Equal(t, prevMsgSerial, client.Connection.MsgSerial(), "expected %d got %d", prevMsgSerial, client.Connection.MsgSerial()) + assert.True(t, client.Channels.Exists("channel")) + channelSerial := client.Channels.Get("channel").GetChannelSerial() + assert.Equal(t, decodedRecoveryKey.ChannelSerials["channel"], channelSerial) } - { //(RTN16c) + { //(RTN16g2) err := ablytest.Wait(ablytest.ConnWaiter(client, client.Close, ably.ConnectionEventClosed), nil) assert.NoError(t, err) assert.Equal(t, "", client.Connection.Key(), @@ -2011,14 +1826,16 @@ func TestRealtimeConn_RTN16(t *testing.T) { assert.Equal(t, "", client.Connection.ID(), "expected id to be empty got %q instead", client.Connection.ID()) } - { //(RTN16e) + { //(RTN16l) // This test was adopted from the ably-js project // https://github.com/ably/ably-js/blob/340e5ce31dc9d7434a06ae4e1eec32bdacc9c6c5/spec/realtime/connection.test.js#L119 - // var query url.Values + var query url.Values + decodedRecoveryKey.ConnectionKey = "ablygo_test_fake-key____" + faultyRecoveryKey, _ := decodedRecoveryKey.Encode() client2 := app.NewRealtime( - ably.WithRecover("_____!ablygo_test_fake-key____:5:3"), + ably.WithRecover(faultyRecoveryKey), ably.WithDial(func(protocol string, u *url.URL, timeout time.Duration) (ably.Conn, error) { - // query = u.Query() + query = u.Query() return ably.DialWebsocket(protocol, u, timeout) })) defer safeclose(t, ablytest.FullRealtimeCloser(client2)) @@ -2027,17 +1844,21 @@ func TestRealtimeConn_RTN16(t *testing.T) { if err == nil { t.Fatal("expected reason to be set") } + { // (RTN16i) + recoverValue := query.Get("recover") + assert.NotEmpty(t, recoverValue) + assert.Equal(t, "ablygo_test_fake-key____", recoverValue) + } { //(RTN16e) info := err.(*ably.ErrorInfo) - assert.Equal(t, 80008, int(info.Code), - "expected 80008 got %d", info.Code) + assert.Equal(t, 80018, int(info.Code), + "expected 80018 got %d", info.Code) reason := client2.Connection.ErrorReason() - assert.Equal(t, 80008, int(reason.Code), - "expected 80008 got %d", reason.Code) + assert.Equal(t, 80018, int(reason.Code), + "expected 80018 got %d", reason.Code) msgSerial := client2.Connection.MsgSerial() // verify msgSerial is 0 (new connection), not 3 - assert.Equal(t, int64(0), msgSerial, - "expected 0 got %d", msgSerial) + assert.Zero(t, msgSerial) assert.NotContains(t, client2.Connection.Key(), "ablygo_test_fake", "expected %q not to contain \"ablygo_test_fake\"", client2.Connection.Key()) } @@ -2292,11 +2113,11 @@ func TestRealtimeConn_RTN14b(t *testing.T) { type closeConn struct { ably.Conn - closed int + closed atomic.Int64 } func (c *closeConn) Close() error { - c.closed++ + c.closed.Add(1) return c.Conn.Close() } @@ -2349,7 +2170,7 @@ func TestRealtimeConn_RTN14g(t *testing.T) { "expected status 400 got %v", c.Connection.ErrorReason().StatusCode) // we make sure the connection is closed - assert.Equal(t, 1, ls.closed, "expected 1 got %v", ls.closed) + assert.Equal(t, int64(1), ls.closed.Load(), "expected 1 got %v", ls.closed.Load()) }) } diff --git a/ably/realtime_presence.go b/ably/realtime_presence.go index ea61fe251..21bd2f807 100644 --- a/ably/realtime_presence.go +++ b/ably/realtime_presence.go @@ -2,9 +2,11 @@ package ably import ( "context" + "errors" "fmt" "strings" "sync" + "time" ) type syncState uint8 @@ -19,17 +21,17 @@ const ( // It allows entering, leaving and updating presence state for the current client or on behalf of other client. // It enables the presence set to be entered and subscribed to, and the historic presence set to be retrieved for a channel. type RealtimePresence struct { - mtx sync.Mutex - data interface{} - serial string - messageEmitter *eventEmitter - channel *RealtimeChannel - members map[string]*PresenceMessage - internalMembers map[string]*PresenceMessage // RTP17 - stale map[string]struct{} - state PresenceAction - syncMtx sync.Mutex - syncState syncState + mtx sync.Mutex + data interface{} + messageEmitter *eventEmitter + channel *RealtimeChannel + members map[string]*PresenceMessage // RTP2 + internalMembers map[string]*PresenceMessage // RTP17 + beforeSyncMembers map[string]*PresenceMessage + state PresenceAction + syncState syncState + queue *msgQueue + syncDone chan struct{} } func newRealtimePresence(channel *RealtimeChannel) *RealtimePresence { @@ -39,29 +41,52 @@ func newRealtimePresence(channel *RealtimeChannel) *RealtimePresence { members: make(map[string]*PresenceMessage), internalMembers: make(map[string]*PresenceMessage), syncState: syncInitial, + syncDone: make(chan struct{}), } - // Lock syncMtx to make all callers to Get(true) wait until the presence - // is in initial sync state. This is to not make them early return - // with an empty presence list before channel attaches. - pres.syncMtx.Lock() + pres.queue = newMsgQueue(pres.channel.client.Connection) return pres } -func (pres *RealtimePresence) verifyChanState() error { +// RTP16c +func (pres *RealtimePresence) isValidChannelState() error { switch state := pres.channel.State(); state { - case ChannelStateDetached, ChannelStateDetaching, ChannelStateFailed: + case ChannelStateDetaching, ChannelStateDetached, ChannelStateFailed, ChannelStateSuspended: return newError(91001, fmt.Errorf("unable to enter presence channel (invalid channel state: %s)", state.String())) default: return nil } } -func (pres *RealtimePresence) send(msg *PresenceMessage) (result, error) { - attached, err := pres.channel.attach() - if err != nil { - return nil, err +// RTP5a +func (pres *RealtimePresence) onChannelDetachedOrFailed(err error) { + for k := range pres.members { + delete(pres.members, k) + } + for k := range pres.internalMembers { + delete(pres.internalMembers, k) + } + pres.queue.Fail(err) +} + +// RTP5f, RTP16b +func (pres *RealtimePresence) onChannelSuspended(err error) { + pres.queue.Fail(err) +} + +func (pres *RealtimePresence) maybeEnqueue(msg *protocolMessage, onAck func(err error)) bool { + if pres.channel.opts().NoQueueing { + if onAck != nil { + onAck(errors.New("unable enqueue message because Options.QueueMessages is set to false")) + } + return false } - if err := pres.verifyChanState(); err != nil { + pres.queue.Enqueue(msg, onAck) + return true +} + +func (pres *RealtimePresence) send(msg *PresenceMessage) (result, error) { + // RTP16c + if err := pres.isValidChannelState(); err != nil { return nil, err } protomsg := &protocolMessage{ @@ -69,20 +94,22 @@ func (pres *RealtimePresence) send(msg *PresenceMessage) (result, error) { Channel: pres.channel.Name, Presence: []*PresenceMessage{msg}, } - return resultFunc(func(ctx context.Context) error { - err := attached.Wait(ctx) - if err != nil { - return err - } - - listen := make(chan error, 1) - onAck := func(err error) { - listen <- err - } - if err := pres.channel.send(protomsg, onAck); err != nil { - return err + listen := make(chan error, 1) + onAck := func(err error) { + listen <- err + } + switch pres.channel.State() { + case ChannelStateInitialized: // RTP16b + if pres.maybeEnqueue(protomsg, onAck) { + pres.channel.attach() } + case ChannelStateAttaching: // RTP16b + pres.maybeEnqueue(protomsg, onAck) + case ChannelStateAttached: // RTP16a + pres.channel.client.Connection.send(protomsg, onAck) // RTP16a, RTL6c + } + return resultFunc(func(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() @@ -92,31 +119,72 @@ func (pres *RealtimePresence) send(msg *PresenceMessage) (result, error) { }), nil } -func (pres *RealtimePresence) syncWait() { +func (pres *RealtimePresence) syncWait(ctx context.Context) error { // If there's an undergoing sync operation or we wait till channel gets // attached, the following lock is going to block until the operations // complete. - pres.syncMtx.Lock() - pres.syncMtx.Unlock() + select { + case <-pres.syncDone: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// RTP18 +func syncSerial(msg *protocolMessage) (noChannelSerial bool, syncSequenceId string, syncCursor string) { + if empty(msg.ChannelSerial) { // RTP18c + noChannelSerial = true + return + } + // RTP18a + serials := strings.Split(msg.ChannelSerial, ":") + syncSequenceId = serials[0] + if len(serials) > 1 { + syncCursor = serials[1] + } + return false, syncSequenceId, syncCursor } -func syncSerial(msg *protocolMessage) string { - if i := strings.IndexRune(msg.ChannelSerial, ':'); i != -1 { - return msg.ChannelSerial[i+1:] +func (pres *RealtimePresence) enterMembers(internalMembers []*PresenceMessage) { + for _, member := range internalMembers { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + // RTP17g + err := pres.EnterClient(ctx, member.ClientID, member.Data) + // RTP17e + if err != nil { + pres.channel.log().Errorf("Error for internal member presence enter with id %v, clientId %v, err %v", member.ID, member.ClientID, err) + pres.channel.emitErrorUpdate(newError(91004, err), true) + } + cancel() } - return "" } -func (pres *RealtimePresence) onAttach(msg *protocolMessage) { - serial := syncSerial(msg) +func (pres *RealtimePresence) onAttach(msg *protocolMessage, isAttachWithoutMessageLoss bool) { pres.mtx.Lock() defer pres.mtx.Unlock() - switch { - case msg.Flags.Has(flagHasPresence): - pres.syncStart(serial) - case pres.syncState == syncInitial: - pres.syncState = syncComplete - pres.syncMtx.Unlock() + // RTP1 + if msg.Flags.Has(flagHasPresence) { + pres.syncStart() + } else { + pres.leaveMembers(pres.members) // RTP19a + if pres.syncState == syncInitial { + pres.syncState = syncComplete + close(pres.syncDone) + } + } + pres.queue.Flush() // RTP5b + // RTP17f + if isAttachWithoutMessageLoss { + if len(pres.internalMembers) > 0 { + internalMembers := make([]*PresenceMessage, len(pres.internalMembers)) + indexCounter := 0 + for _, member := range pres.internalMembers { + internalMembers[indexCounter] = member + indexCounter = indexCounter + 1 + } + go pres.enterMembers(internalMembers) + } } } @@ -127,19 +195,30 @@ func (pres *RealtimePresence) SyncComplete() bool { return pres.syncState == syncComplete } -func (pres *RealtimePresence) syncStart(serial string) { +func (pres *RealtimePresence) syncStart() { if pres.syncState == syncInProgress { return } else if pres.syncState != syncInitial { - // Sync has started, make all callers to Get(true) wait. If it's channel's - // initial sync, the callers are already waiting. - pres.syncMtx.Lock() + // Start new sync after previous one was finished + pres.syncDone = make(chan struct{}) } - pres.serial = serial pres.syncState = syncInProgress - pres.stale = make(map[string]struct{}, len(pres.members)) - for memberKey := range pres.members { - pres.stale[memberKey] = struct{}{} + pres.beforeSyncMembers = make(map[string]*PresenceMessage, len(pres.members)) // RTP19 + for memberKey, member := range pres.members { + pres.beforeSyncMembers[memberKey] = member + } +} + +// RTP19, RTP19a +func (pres *RealtimePresence) leaveMembers(members map[string]*PresenceMessage) { + for memberKey := range members { + delete(pres.members, memberKey) + } + for _, msg := range members { + msg.Action = PresenceActionLeave + msg.ID = "" + msg.Timestamp = time.Now().UnixMilli() + pres.messageEmitter.Emit(msg.Action, (*subscriptionPresenceMessage)(msg)) } } @@ -147,62 +226,111 @@ func (pres *RealtimePresence) syncEnd() { if pres.syncState != syncInProgress { return } - for memberKey := range pres.stale { - delete(pres.members, memberKey) - } - for memberKey, presence := range pres.members { + pres.leaveMembers(pres.beforeSyncMembers) // RTP19 + + for memberKey, presence := range pres.members { // RTP2f if presence.Action == PresenceActionAbsent { delete(pres.members, memberKey) } } - pres.stale = nil + pres.beforeSyncMembers = nil pres.syncState = syncComplete // Sync has completed, unblock all callers to Get(true) waiting // for the sync. - pres.syncMtx.Unlock() + close(pres.syncDone) +} + +// RTP2a, RTP2b, RTP2c +func (pres *RealtimePresence) addPresenceMember(memberMap map[string]*PresenceMessage, memberKey string, presenceMember *PresenceMessage) bool { + if existingMember, ok := memberMap[memberKey]; ok { // RTP2a + isMemberNew, err := presenceMember.IsNewerThan(existingMember) // RTP2b + if err != nil { + pres.log().Error(err) + } + if isMemberNew { + memberMap[memberKey] = presenceMember + return true + } + return false + } + memberMap[memberKey] = presenceMember + return true } -func (pres *RealtimePresence) processIncomingMessage(msg *protocolMessage, syncSerial string) { - for _, presmsg := range msg.Presence { - if presmsg.Timestamp == 0 { - presmsg.Timestamp = msg.Timestamp +// RTP2a, RTP2b, RTP2c +func (pres *RealtimePresence) removePresenceMember(memberMap map[string]*PresenceMessage, memberKey string, presenceMember *PresenceMessage) bool { + if existingMember, ok := memberMap[memberKey]; ok { // RTP2a + isMemberNew, err := presenceMember.IsNewerThan(existingMember) // RTP2b + if err != nil { + pres.log().Error(err) + } + if isMemberNew { + delete(memberMap, memberKey) + return existingMember.Action != PresenceActionAbsent } } + return false +} + +// RTP18 +func (pres *RealtimePresence) processProtoSyncMessage(msg *protocolMessage) { + // TODO - Part of RTP18a where new sequence id is received in middle of sync will not call synStart + // because sync is in progress. Though it will wait till all proto messages are processed. + // This is not implemented because of additional complexity of managing locks and reverting to prev. memberstate + noChannelSerial, _, syncCursor := syncSerial(msg) + + pres.syncStart() // RTP18a, RTP18c + + pres.processProtoPresenceMessage(msg) + + if noChannelSerial || empty(syncCursor) { // RTP18c, RTP18b + pres.syncEnd() + } +} + +func (pres *RealtimePresence) processProtoPresenceMessage(msg *protocolMessage) { pres.mtx.Lock() - if syncSerial != "" { - pres.syncStart(syncSerial) + // RTP17 - Update internal presence map + for _, presenceMember := range msg.Presence { + memberKey := presenceMember.ClientID // RTP17h + if pres.channel.client.Connection.ID() != presenceMember.ConnectionID { // RTP17 + continue + } + switch presenceMember.Action { + case PresenceActionEnter, PresenceActionUpdate, PresenceActionPresent: // RTP2d, RTP17b + presenceMemberShallowCopy := *presenceMember + presenceMemberShallowCopy.Action = PresenceActionPresent + pres.addPresenceMember(pres.internalMembers, memberKey, &presenceMemberShallowCopy) + case PresenceActionLeave: // RTP17b, RTP2e + if !presenceMember.isServerSynthesized() { + pres.removePresenceMember(pres.internalMembers, memberKey, presenceMember) + } + } } - // Filter out old messages by their timestamp. - messages := make([]*PresenceMessage, 0, len(msg.Presence)) + // Update presence map / channel's member state. - for _, member := range msg.Presence { - memberKey := member.ConnectionID + member.ClientID - if oldMember, ok := pres.members[memberKey]; ok { - if member.Timestamp <= oldMember.Timestamp { - continue // do not process old message - } + updatedPresenceMessages := make([]*PresenceMessage, 0, len(msg.Presence)) + for _, presenceMember := range msg.Presence { + memberKey := presenceMember.ConnectionID + presenceMember.ClientID // TP3h + memberUpdated := false + switch presenceMember.Action { + case PresenceActionEnter, PresenceActionUpdate, PresenceActionPresent: // RTP2d + delete(pres.beforeSyncMembers, memberKey) + presenceMemberShallowCopy := *presenceMember + presenceMemberShallowCopy.Action = PresenceActionPresent + memberUpdated = pres.addPresenceMember(pres.members, memberKey, &presenceMemberShallowCopy) + case PresenceActionLeave: // RTP2e + memberUpdated = pres.removePresenceMember(pres.members, memberKey, presenceMember) } - switch member.Action { - case PresenceActionEnter: - pres.members[memberKey] = member - case PresenceActionUpdate: - member.Action = PresenceActionPresent - fallthrough - case PresenceActionPresent: - delete(pres.stale, memberKey) - pres.members[memberKey] = member - case PresenceActionLeave: - delete(pres.members, memberKey) + // RTP2g + if memberUpdated { + updatedPresenceMessages = append(updatedPresenceMessages, presenceMember) } - messages = append(messages, member) - } - if syncSerial == "" { - pres.syncEnd() } pres.mtx.Unlock() - msg.Count = len(messages) - msg.Presence = messages - for _, msg := range msg.Presence { + + // RTP2g + for _, msg := range updatedPresenceMessages { pres.messageEmitter.Emit(msg.Action, (*subscriptionPresenceMessage)(msg)) } } @@ -267,7 +395,10 @@ func (pres *RealtimePresence) GetWithOptions(ctx context.Context, options ...Pre } if opts.waitForSync { - pres.syncWait() + err = pres.syncWait(ctx) + if err != nil { + return nil, err + } } pres.mtx.Lock() diff --git a/ably/realtime_presence_integration_test.go b/ably/realtime_presence_integration_test.go index f29c49253..d27638fcd 100644 --- a/ably/realtime_presence_integration_test.go +++ b/ably/realtime_presence_integration_test.go @@ -57,7 +57,7 @@ func TestRealtimePresence_Sync(t *testing.T) { assert.NoError(t, err) } -func TestRealtimePresence_Sync250(t *testing.T) { +func TestRealtimePresence_Sync250_RTP4(t *testing.T) { app, client1 := ablytest.NewRealtime(nil...) defer safeclose(t, ablytest.FullRealtimeCloser(client1), app) client2 := app.NewRealtime(nil...) @@ -167,3 +167,29 @@ func ExampleRealtimePresence_Enter() { return } } + +// When a client is created without a ClientID, EnterClient is used to announce the presence of a client. +// This example shows a client without a clientID announcing the presence of "Client A" using EnterClient. +func ExampleRealtimePresence_EnterClient() { + + // A new realtime client is created without providing a ClientID. + client, err := ably.NewRealtime( + ably.WithKey("ABLY_PRIVATE_KEY"), + ) + if err != nil { + fmt.Println(err) + return + } + + // A new channel is initialised. + channel := client.Channels.Get("chat") + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // The presence of Client A is announced using EnterClient. + if err := channel.Presence.EnterClient(ctx, "Client A", nil); err != nil { + fmt.Println(err) + return + } +} diff --git a/ably/realtime_presence_internal_test.go b/ably/realtime_presence_internal_test.go index ad112fd0a..3b4494348 100644 --- a/ably/realtime_presence_internal_test.go +++ b/ably/realtime_presence_internal_test.go @@ -4,13 +4,40 @@ package ably import ( + "bytes" + "context" "errors" + "log" "testing" "github.com/stretchr/testify/assert" ) -func TestVerifyChanState(t *testing.T) { +var ( + buffer bytes.Buffer + mocklogger = log.New(&buffer, "logger: ", log.Lshortfile) +) + +// mockChannelWithState is a test helper that returns a mock channel in a specified state +func mockChannelWithState(channelState *ChannelState, connectionState *ConnectionState) *RealtimeChannel { + mockChannel := RealtimeChannel{ + client: &Realtime{ + rest: &REST{ + log: logger{l: &stdLogger{mocklogger}}, + }, + Connection: &Connection{}, + }, + } + if channelState != nil { + mockChannel.state = *channelState + } + if connectionState != nil { + mockChannel.client.Connection.state = *connectionState + } + return &mockChannel +} + +func TestVerifyChanState_RTP16(t *testing.T) { tests := map[string]struct { channel *RealtimeChannel expectedErr error @@ -27,9 +54,9 @@ func TestVerifyChanState(t *testing.T) { channel: mockChannelWithState(&ChannelStateAttached, nil), expectedErr: nil, }, - `No error if the channel is in state: "SUSPENDED"`: { + `Error if the channel is in state: "SUSPENDED"`: { channel: mockChannelWithState(&ChannelStateSuspended, nil), - expectedErr: nil, + expectedErr: newError(91001, errors.New("unable to enter presence channel (invalid channel state: SUSPENDED)")), }, `Error if the channel is in state: "DETACHING"`: { channel: mockChannelWithState(&ChannelStateDetaching, nil), @@ -48,7 +75,7 @@ func TestVerifyChanState(t *testing.T) { for testName, test := range tests { t.Run(testName, func(t *testing.T) { presence := newRealtimePresence(test.channel) - err := presence.verifyChanState() + err := presence.isValidChannelState() assert.Equal(t, test.expectedErr, err) }) } @@ -57,33 +84,32 @@ func TestVerifyChanState(t *testing.T) { func TestSend(t *testing.T) { tests := map[string]struct { channel *RealtimeChannel - msg PresenceMessage + msg Message expectedResult result expectedErr error }{ `No error sending presence if the channel is in state: "ATTACHED"`: { - channel: mockChannelWithState(&ChannelStateAttached, nil), - msg: PresenceMessage{ - Message: Message{Name: "Hello"}, - Action: PresenceActionEnter, - }, - expectedErr: nil, + channel: mockChannelWithState(&ChannelStateAttached, nil), + msg: Message{Name: "Hello"}, + expectedErr: (*ErrorInfo)(nil), + }, + `Error if channel is: "ATTACHED" and connection is :"CLOSED"`: { + channel: mockChannelWithState(&ChannelStateAttached, &ConnectionStateClosed), + msg: Message{Name: "Hello"}, + expectedErr: newError(80017, errors.New("Connection unavailable")), }, `Error if channel is: "DETACHED" and connection is :"CLOSED"`: { - channel: mockChannelWithState(&ChannelStateDetached, &ConnectionStateClosed), - msg: PresenceMessage{ - Message: Message{Name: "Hello"}, - Action: PresenceActionEnter, - }, - expectedErr: newError(80000, errors.New("cannot Attach channel because connection is in CLOSED state")), + channel: mockChannelWithState(&ChannelStateDetached, &ConnectionStateClosed), + msg: Message{Name: "Hello"}, + expectedErr: newError(91001, errors.New("unable to enter presence channel (invalid channel state: DETACHED)")), }, } for testName, test := range tests { t.Run(testName, func(t *testing.T) { presence := newRealtimePresence(test.channel) - _, err := presence.send(&test.msg) - assert.Equal(t, test.expectedErr, err) + err := presence.EnterClient(context.Background(), "clientId", &test.msg) + assert.Equal(t, test.expectedErr, err.(*ErrorInfo)) }) } } diff --git a/ably/realtime_presence_test.go b/ably/realtime_presence_test.go deleted file mode 100644 index e6d28464d..000000000 --- a/ably/realtime_presence_test.go +++ /dev/null @@ -1,38 +0,0 @@ -//go:build !integration -// +build !integration - -package ably_test - -import ( - "context" - "fmt" - "time" - - "github.com/ably/ably-go/ably" -) - -// When a client is created without a ClientID, EnterClient is used to announce the presence of a client. -// This example shows a client without a clientID announcing the presence of "Client A" using EnterClient. -func ExampleRealtimePresence_EnterClient() { - - // A new realtime client is created without providing a ClientID. - client, err := ably.NewRealtime( - ably.WithKey("ABLY_PRIVATE_KEY"), - ) - if err != nil { - fmt.Println(err) - return - } - - // A new channel is initialised. - channel := client.Channels.Get("chat") - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - - // The presence of Client A is announced using EnterClient. - if err := channel.Presence.EnterClient(ctx, "Client A", nil); err != nil { - fmt.Println(err) - return - } -} diff --git a/ably/state.go b/ably/state.go index 053bbee59..a0e922274 100644 --- a/ably/state.go +++ b/ably/state.go @@ -45,6 +45,7 @@ func goWaiter(f func() error) result { var ( errDisconnected = newErrorf(ErrDisconnected, "Connection temporarily unavailable") errSuspended = newErrorf(ErrConnectionSuspended, "Connection unavailable") + errClosed = newErrorf(ErrConnectionClosed, "Connection unavailable") errFailed = newErrorf(ErrConnectionFailed, "Connection failed") errNeverConnected = newErrorf(ErrConnectionSuspended, "Unable to establish connection") @@ -105,7 +106,7 @@ func channelStateError(state ChannelState, err error) *ErrorInfo { // pendingEmitter emits confirmation events triggered by ACK or NACK messages. type pendingEmitter struct { - queue []msgCh + queue []msgWithAckCallback log logger } @@ -115,15 +116,10 @@ func newPendingEmitter(log logger) pendingEmitter { } } -type msgCh struct { - msg *protocolMessage - onAck func(err error) -} - // Dismiss lets go of the channels that are waiting for an error on this queue. // The queue can continue sending messages. -func (q *pendingEmitter) Dismiss() []msgCh { - cx := make([]msgCh, len(q.queue)) +func (q *pendingEmitter) Dismiss() []msgWithAckCallback { + cx := make([]msgWithAckCallback, len(q.queue)) copy(cx, q.queue) q.queue = nil return cx @@ -136,7 +132,7 @@ func (q *pendingEmitter) Enqueue(msg *protocolMessage, onAck func(err error)) { panic(fmt.Sprintf("protocol violation: expected next enqueued message to have msgSerial %d; got %d", expected, got)) } } - q.queue = append(q.queue, msgCh{msg, onAck}) + q.queue = append(q.queue, msgWithAckCallback{msg, onAck}) } func (q *pendingEmitter) Ack(msg *protocolMessage, errInfo *ErrorInfo) { @@ -190,14 +186,14 @@ func (q *pendingEmitter) Ack(msg *protocolMessage, errInfo *ErrorInfo) { } } -type msgch struct { +type msgWithAckCallback struct { msg *protocolMessage onAck func(err error) } type msgQueue struct { mtx sync.Mutex - queue []msgch + queue []msgWithAckCallback conn *Connection } @@ -210,14 +206,14 @@ func newMsgQueue(conn *Connection) *msgQueue { func (q *msgQueue) Enqueue(msg *protocolMessage, onAck func(err error)) { q.mtx.Lock() // TODO(rjeczalik): reorder the queue so Presence / Messages can be merged - q.queue = append(q.queue, msgch{msg, onAck}) + q.queue = append(q.queue, msgWithAckCallback{msg, onAck}) q.mtx.Unlock() } func (q *msgQueue) Flush() { q.mtx.Lock() - for _, msgch := range q.queue { - q.conn.send(msgch.msg, msgch.onAck) + for _, queueMsg := range q.queue { + q.conn.send(queueMsg.msg, queueMsg.onAck) } q.queue = nil q.mtx.Unlock() @@ -225,10 +221,10 @@ func (q *msgQueue) Flush() { func (q *msgQueue) Fail(err error) { q.mtx.Lock() - for _, msgch := range q.queue { - q.log().Errorf("failure sending message (serial=%d): %v", msgch.msg.MsgSerial, err) - if msgch.onAck != nil { - msgch.onAck(newError(90000, err)) + for _, queueMsg := range q.queue { + q.log().Errorf("failure sending message (serial=%d): %v", queueMsg.msg.MsgSerial, err) + if queueMsg.onAck != nil { + queueMsg.onAck(newError(90000, err)) } } q.queue = nil diff --git a/ablytest/sandbox.go b/ablytest/sandbox.go index 9b81ce839..59a7f36cf 100644 --- a/ablytest/sandbox.go +++ b/ablytest/sandbox.go @@ -13,6 +13,7 @@ import ( "net/url" "os" "path" + "syscall" "time" "github.com/ably/ably-go/ably" @@ -159,13 +160,15 @@ func NewSandboxWithEnv(config *Config, env string) (*Sandbox, error) { req.Header.Set("Accept", "application/json") resp, err := app.client.Do(req) if err != nil { - // return from this function now only if the error wasn't due to a timeout - if err, ok := err.(*url.Error); ok && !err.Timeout() { - return nil, err + if !errors.Is(err, syscall.ECONNRESET) { // if not connection reset by peer + // return error if it wasn't due to a timeout + if err, ok := err.(*url.Error); ok && !err.Timeout() { + return nil, err + } } } - if err != nil { + if err != nil || (resp != nil && resp.StatusCode == 504) { // gateway timeout // Timeout. Back off before allowing another attempt. log.Println("warn: request timeout, attempting retry") time.Sleep(retryInterval) diff --git a/common b/common index e4a5c7692..645153d29 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit e4a5c7692044807e3011b959426868b5075c998e +Subproject commit 645153d294d64875b68f2a4a58259338ed0b915c