Skip to content

Commit

Permalink
Reverted to the original approach, removed TestMQTTSessionNotDeletedO…
Browse files Browse the repository at this point in the history
…nDeleteConsumerError
  • Loading branch information
levb committed Jun 21, 2024
1 parent 25490e1 commit 8c9faa7
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 72 deletions.
16 changes: 3 additions & 13 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,10 +1563,6 @@ func (jsa *mqttJSA) newRequestEx(kind, subject, cidHash string, hdr int, msg []b
if err != nil {
return nil, err
}
if timeout == 0{
return nil, err
}

if len(responses) != 1 {
return nil, fmt.Errorf("unreachable: invalid number of responses (%d)", len(responses))
}
Expand Down Expand Up @@ -1626,10 +1622,6 @@ func (jsa *mqttJSA) newRequestExMulti(kind, subject, cidHash string, hdrs []int,
r2i[reply] = i
}

if timeout == 0 {
return nil, nil
}

// Wait for all responses to come back, or for the timeout to expire. We
// don't want to use time.After() which causes memory growth because the
// timer can't be stopped and will need to expire to then be garbage
Expand Down Expand Up @@ -1712,14 +1704,12 @@ func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiCon
// if noWait is specified, does not wait for the JS response, returns nil
func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) {
subj := fmt.Sprintf(JSApiConsumerDeleteT, streamName, consName)

timeout := mqttJSAPITimeout
if noWait {
timeout = 0
jsa.sendMsg(subj, nil)
return nil, nil
}

// timeout == 0 signals that we don't want to wait for the response.
cdri, err := jsa.newRequestEx(mqttJSAConsumerDel, subj, _EMPTY_, -1, nil, timeout)
cdri, err := jsa.newRequestEx(mqttJSAConsumerDel, subj, _EMPTY_, -1, nil, mqttJSAPITimeout)
if err != nil {
return nil, err
}
Expand Down
59 changes: 0 additions & 59 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6789,65 +6789,6 @@ func TestMQTTConsumerMemStorageReload(t *testing.T) {
}
}

type unableToDeleteConsLogger struct {
DummyLogger
errCh chan string
}

func (l *unableToDeleteConsLogger) Errorf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
if strings.Contains(msg, "unable to delete consumer") {
l.errCh <- msg
}
}

func TestMQTTSessionNotDeletedOnDeleteConsumerError(t *testing.T) {
org := mqttJSAPITimeout
mqttJSAPITimeout = 1000 * time.Millisecond
defer func() { mqttJSAPITimeout = org }()

cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2)
defer cl.shutdown()

o := cl.opts[0]
s1 := cl.servers[0]
// Plug error logger to s1
l := &unableToDeleteConsLogger{errCh: make(chan string, 10)}
s1.SetLogger(l, false, false)

nc, js := jsClientConnect(t, s1)
defer nc.Close()

mc, r := testMQTTConnectRetry(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)

testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
testMQTTFlush(t, mc, nil, r)

// Now shutdown server 2, we should lose quorum
cl.servers[1].Shutdown()

// Close the MQTT client:
testMQTTDisconnect(t, mc, nil)

// We should have reported that there was an error deleting the consumer
select {
case <-l.errCh:
// OK
case <-time.After(time.Second):
t.Fatal("Server did not report any error")
}

// Now restart the server 2 so that we can check that the session is still persisted.
cl.restartAllSamePorts()
cl.waitOnStreamLeader(globalAccountName, mqttSessStreamName)

si, err := js.StreamInfo(mqttSessStreamName)
require_NoError(t, err)
require_True(t, si.State.Msgs == 1)
}

// Test for auto-cleanup of consumers.
func TestMQTTConsumerInactiveThreshold(t *testing.T) {
tdir := t.TempDir()
Expand Down

0 comments on commit 8c9faa7

Please sign in to comment.