Skip to content

Commit

Permalink
[IMPROVED] Optimize on last by subject if mmps is one (#4714)
Browse files Browse the repository at this point in the history
Also on SubjectsState only loadMsgs if the mb.fss is nil.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Oct 27, 2023
2 parents 9edecc8 + 85784a3 commit dfe1721
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 34 deletions.
70 changes: 38 additions & 32 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4687,44 +4687,50 @@ func (o *consumer) selectStartingSeqNo() {
}
}
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
// A threshold for when we switch from get last msg to subjects state.
const numSubjectsThresh = 256
lss := &lastSeqSkipList{resume: state.LastSeq}
var filters []string
if o.subjf == nil {
filters = append(filters, o.cfg.FilterSubject)
// If our parent stream is set to max msgs per subject of 1 this is just
// a normal consumer at this point. We can avoid any heavy lifting.
if o.mset.cfg.MaxMsgsPer == 1 {
o.sseq = state.FirstSeq
} else {
for _, filter := range o.subjf {
filters = append(filters, filter.subject)
// A threshold for when we switch from get last msg to subjects state.
const numSubjectsThresh = 256
lss := &lastSeqSkipList{resume: state.LastSeq}
var filters []string
if o.subjf == nil {
filters = append(filters, o.cfg.FilterSubject)
} else {
for _, filter := range o.subjf {
filters = append(filters, filter.subject)
}
}
}
for _, filter := range filters {
if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh {
var smv StoreMsg
for subj := range st {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
for _, filter := range filters {
if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh {
var smv StoreMsg
for subj := range st {
if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil {
lss.seqs = append(lss.seqs, sm.seq)
}
}
} else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 {
for _, ss := range mss {
lss.seqs = append(lss.seqs, ss.Last)
}
}
} else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 {
for _, ss := range mss {
lss.seqs = append(lss.seqs, ss.Last)
}
}
// Sort the skip list if needed.
if len(lss.seqs) > 1 {
sort.Slice(lss.seqs, func(i, j int) bool {
return lss.seqs[j] > lss.seqs[i]
})
}
if len(lss.seqs) == 0 {
o.sseq = state.LastSeq
} else {
o.sseq = lss.seqs[0]
}
// Assign skip list.
o.lss = lss
}
// Sort the skip list if needed.
if len(lss.seqs) > 1 {
sort.Slice(lss.seqs, func(i, j int) bool {
return lss.seqs[j] > lss.seqs[i]
})
}
if len(lss.seqs) == 0 {
o.sseq = state.LastSeq
} else {
o.sseq = lss.seqs[0]
}
// Assign skip list.
o.lss = lss
} else if o.cfg.OptStartTime != nil {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
Expand Down
4 changes: 2 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2426,7 +2426,7 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
fs.mu.RLock()
defer fs.mu.RUnlock()

if fs.state.Msgs == 0 {
if fs.state.Msgs == 0 || fs.noTrackSubjects() {
return nil
}

Expand Down Expand Up @@ -2454,7 +2454,7 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {

mb.mu.Lock()
var shouldExpire bool
if mb.cacheNotLoaded() {
if mb.fss == nil {
// Make sure we have fss loaded.
mb.loadMsgsWithLock()
shouldExpire = true
Expand Down

0 comments on commit dfe1721

Please sign in to comment.