Skip to content

Commit

Permalink
[CHANGED] MQTT s.clear() do not wait for JS responses when disconnect…
Browse files Browse the repository at this point in the history
…ing the session
  • Loading branch information
levb committed Jun 20, 2024
1 parent 5749c41 commit 173649e
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ func (s *Server) mqttHandleClosedClient(c *client) {

// This needs to be done outside of any lock.
if doClean {
if err := sess.clear(); err != nil {
if err := sess.clear(true); err != nil {
c.Errorf(err.Error())
}
}
Expand Down Expand Up @@ -1475,7 +1475,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
// Opportunistically delete the old (legacy) consumer, from v2.10.10 and
// before. Ignore any errors that might arise.
rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName)
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true)

// Create a new, uniquely names consumer for retained messages for this
// server. The prior one will expire eventually.
Expand Down Expand Up @@ -1701,8 +1701,14 @@ func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiCon
return ccr, ccr.ToError()
}

func (jsa *mqttJSA) deleteConsumer(streamName, consName string) (*JSApiConsumerDeleteResponse, error) {
// if noWait is specified, does not wait for the JS response, returns nil
func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) {
subj := fmt.Sprintf(JSApiConsumerDeleteT, streamName, consName)
if noWait {
jsa.sendMsg(subj, nil)
return nil, nil
}

cdri, err := jsa.newRequest(mqttJSAConsumerDel, subj, 0, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3178,7 +3184,7 @@ func (sess *mqttSession) save() error {
//
// Runs from the client's readLoop.
// Lock not held on entry, but session is in the locked map.
func (sess *mqttSession) clear() error {
func (sess *mqttSession) clear(noWait bool) error {
var durs []string
var pubRelDur string

Expand Down Expand Up @@ -3206,19 +3212,19 @@ func (sess *mqttSession) clear() error {
sess.mu.Unlock()

for _, dur := range durs {
if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur); isErrorOtherThan(err, JSConsumerNotFoundErr) {
if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur, noWait); isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", dur, sess.id, err)
}
}
if pubRelDur != _EMPTY_ {
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur)
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur, noWait)
if isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", pubRelDur, sess.id, err)
}
}

if seq > 0 {
err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true)
err := sess.jsa.deleteMsg(mqttSessStreamName, seq, !noWait)
// Ignore the various errors indicating that the message (or sequence)
// is already deleted, can happen in a cluster.
if isErrorOtherThan(err, JSSequenceNotFoundErrF) {
Expand Down Expand Up @@ -3823,7 +3829,7 @@ CHECK:
// This Session lasts as long as the Network Connection. State data
// associated with this Session MUST NOT be reused in any subsequent
// Session.
if err := es.clear(); err != nil {
if err := es.clear(false); err != nil {
asm.removeSession(es, true)
return err
}
Expand Down

0 comments on commit 173649e

Please sign in to comment.