Skip to content

Commit

Permalink
When we miss on LoadNextMsg but not starting at first sequence, optim…
Browse files Browse the repository at this point in the history
…ize jump ahead to not need blk access from file.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Jun 21, 2024
1 parent a7c9471 commit 654ed7d
Showing 1 changed file with 29 additions and 4 deletions.
33 changes: 29 additions & 4 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2590,6 +2590,32 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
return ss
}

// This is used to see if we can selectively jump start blocks based on filter subject and a floor block index.
// Will return -1 if no matches at all.
func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) int {
start := uint32(math.MaxUint32)
if wc {
fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) {
if psi.fblk < start {
start = psi.fblk
}
})
} else if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
start = psi.fblk
}
// Nothing found.
if start == uint32(math.MaxUint32) {
return -1
}
// Here we need to translate this to index into fs.blks.
mb := fs.bim[start]
if mb == nil {
return -1
}
bi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
return bi
}

// Optimized way for getting all num pending matching a filter subject.
// Lock should be held.
func (fs *fileStore) numFilteredPending(filter string, ss *SimpleState) {
Expand Down Expand Up @@ -6438,16 +6464,15 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
// Similar to above if start <= first seq.
// TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers.
if i == bi {
var ss SimpleState
fs.numFilteredPendingNoLast(filter, &ss)
nbi := fs.checkSkipFirstBlock(filter, wc)
// Nothing available.
if ss.Msgs == 0 {
if nbi < 0 {
return nil, fs.state.LastSeq, ErrStoreEOF
}
// See if we can jump ahead here.
// Right now we can only spin on first, so if we have interior sparseness need to favor checking per block fss if loaded.
// For v2 will track all blocks that have matches for psim.
if nbi, _ := fs.selectMsgBlockWithIndex(ss.First); nbi > i {
if nbi > i {
i = nbi - 1 // For the iterator condition i++
}
}
Expand Down

0 comments on commit 654ed7d

Please sign in to comment.