Skip to content

Commit

Permalink
[IMPROVED] Check stream state performance (#5963)
Browse files Browse the repository at this point in the history
When checking interest state for interest or workqueue streams, we would
check all msgs from the streams first sequence through ack floor and up
to delivered.

We do this to make sure our ack state is correct. In cases where there
were alot of messages still in the stream due to offline or slow
consumers, this could be a heavy load on a server.

This improvement uses LoadNextMsg() to efficiently skip ahead and we now
remember our checked floor and do not repeat checks for messages below
our check floor on subsequent runs.

This change also highlighted a datarace in filestore that is fixed here
as well.

Signed-off-by: Derek Collison <[email protected]>

---------

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Oct 8, 2024
1 parent 9a41ec4 commit c9d0a12
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 101 deletions.
71 changes: 39 additions & 32 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ type consumer struct {
dseq uint64 // delivered consumer sequence
adflr uint64 // ack delivery floor
asflr uint64 // ack store floor
chkflr uint64 // our check floor, interest streams only.
npc int64 // Num Pending Count
npf uint64 // Num Pending Floor Sequence
dsubj string
Expand Down Expand Up @@ -3033,28 +3034,6 @@ func (o *consumer) isFiltered() bool {
return false
}

// Check if we would have matched and needed an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) matchAck(sseq uint64) bool {
o.mu.RLock()
defer o.mu.RUnlock()

// Check if we are filtered, and if so check if this is even applicable to us.
if o.isFiltered() {
if o.mset == nil {
return false
}
var svp StoreMsg
if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil {
return false
}
if !o.isFilteredMatch(svp.subj) {
return false
}
}
return true
}

// Check if we need an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) needAck(sseq uint64, subj string) bool {
Expand Down Expand Up @@ -5637,16 +5616,24 @@ func (o *consumer) isMonitorRunning() bool {
var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence")

// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent.
func (o *consumer) checkStateForInterestStream() error {
func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
o.mu.RLock()
// See if we need to process this update if our parent stream is not a limits policy stream.
mset := o.mset
shouldProcessState := mset != nil && o.retention != LimitsPolicy
if o.closed || !shouldProcessState || o.store == nil {
if o.closed || !shouldProcessState || o.store == nil || ss == nil {
o.mu.RUnlock()
return nil
}
store := mset.store
state, err := o.store.State()

filters, subjf, filter := o.filters, o.subjf, _EMPTY_
var wc bool
if filters == nil && subjf != nil {
filter, wc = subjf[0].subject, subjf[0].hasWildcard
}
chkfloor := o.chkflr
o.mu.RUnlock()

if err != nil {
Expand All @@ -5659,26 +5646,46 @@ func (o *consumer) checkStateForInterestStream() error {
return nil
}

// We should make sure to update the acks.
var ss StreamState
mset.store.FastState(&ss)

// Check if the underlying stream's last sequence is less than our floor.
// This can happen if the stream has been reset and has not caught up yet.
if asflr > ss.LastSeq {
return errAckFloorHigherThanLastSeq
}

for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ {
if o.matchAck(seq) {
var smv StoreMsg
var seq, nseq uint64
// Start at first stream seq or a previous check floor, whichever is higher.
// Note this will really help for interest retention, with WQ the loadNextMsg
// gets us a long way already since it will skip deleted msgs not for our filter.
fseq := ss.FirstSeq
if chkfloor > fseq {
fseq = chkfloor
}

for seq = fseq; asflr > 0 && seq <= asflr; seq++ {
if filters != nil {
_, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv)
} else {
_, nseq, err = store.LoadNextMsg(filter, wc, seq, &smv)
}
// if we advanced sequence update our seq. This can be on no error and EOF.
if nseq > seq {
seq = nseq
}
// Only ack though if no error and seq <= ack floor.
if err == nil && seq <= asflr {
mset.ackMsg(o, seq)
}
}

o.mu.RLock()
o.mu.Lock()
// Update our check floor.
if seq > o.chkflr {
o.chkflr = seq
}
// See if we need to process this update if our parent stream is not a limits policy stream.
state, _ = o.store.State()
o.mu.RUnlock()
o.mu.Unlock()

// If we have pending, we will need to walk through to delivered in case we missed any of those acks as well.
if state != nil && len(state.Pending) > 0 && state.AckFloor.Stream > 0 {
Expand Down
30 changes: 19 additions & 11 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2586,7 +2586,7 @@ func (fs *fileStore) numFilteredPendingNoLast(filter string, ss *SimpleState) {

// Optimized way for getting all num pending matching a filter subject.
// Optionally look up last sequence. Sometimes do not need last and this avoids cost.
// Lock should be held.
// Read lock should be held.
func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *SimpleState) {
isAll := filter == _EMPTY_ || filter == fwcs

Expand Down Expand Up @@ -2653,17 +2653,25 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si
}
}
// Update fblk since fblk was outdated.
if !wc {
if info, ok := fs.psim.Find(stringToBytes(filter)); ok {
info.fblk = i
}
} else {
fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) {
if i > psi.fblk {
psi.fblk = i
// We only require read lock here as that is desirable,
// so we need to do this in a go routine to acquire write lock.
go func() {
fs.mu.Lock()
defer fs.mu.Unlock()
if !wc {
if info, ok := fs.psim.Find(stringToBytes(filter)); ok {
if i > info.fblk {
info.fblk = i
}
}
})
}
} else {
fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) {
if i > psi.fblk {
psi.fblk = i
}
})
}
}()
}
// Now gather last sequence if asked to do so.
if last {
Expand Down
109 changes: 72 additions & 37 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7143,14 +7143,21 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdate(t *testing.T) {
require_Equal(t, ss.First, 1002)
require_Equal(t, ss.Last, 1003)

// Check psi was updated.
fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.baz"))
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, psi.total, 2)
require_Equal(t, psi.fblk, 84)
require_Equal(t, psi.lblk, 84)
// Check psi was updated. This is done in separate go routine to acquire
// the write lock now.
checkFor(t, time.Second, 100*time.Millisecond, func() error {
fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.baz"))
total, fblk, lblk := psi.total, psi.fblk, psi.lblk
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, total, 2)
require_Equal(t, lblk, 84)
if fblk != 84 {
return fmt.Errorf("fblk should be 84, still %d", fblk)
}
return nil
})
}

func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) {
Expand Down Expand Up @@ -7187,19 +7194,21 @@ func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) {
require_Equal(t, fs.numMsgBlocks(), 92)
fs.mu.RLock()
psi, ok := fs.psim.Find([]byte("foo.22.baz"))
total, fblk, lblk := psi.total, psi.fblk, psi.lblk
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, psi.total, 2)
require_Equal(t, psi.fblk, 1)
require_Equal(t, psi.lblk, 92)
require_Equal(t, total, 2)
require_Equal(t, fblk, 1)
require_Equal(t, lblk, 92)

fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.22.bar"))
total, fblk, lblk = psi.total, psi.fblk, psi.lblk
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, psi.total, 2)
require_Equal(t, psi.fblk, 1)
require_Equal(t, psi.lblk, 92)
require_Equal(t, total, 2)
require_Equal(t, fblk, 1)
require_Equal(t, lblk, 92)

// No make sure that a call to numFilterPending which will initially walk all blocks if starting from seq 1 updates psi.
var ss SimpleState
Expand All @@ -7211,21 +7220,33 @@ func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) {
require_Equal(t, ss.Last, 1006)

// Check both psi were updated.
fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.22.baz"))
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, psi.total, 2)
require_Equal(t, psi.fblk, 92)
require_Equal(t, psi.lblk, 92)
checkFor(t, time.Second, 100*time.Millisecond, func() error {
fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.22.baz"))
total, fblk, lblk = psi.total, psi.fblk, psi.lblk
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, total, 2)
require_Equal(t, lblk, 92)
if fblk != 92 {
return fmt.Errorf("fblk should be 92, still %d", fblk)
}
return nil
})

fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.22.bar"))
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, psi.total, 2)
require_Equal(t, psi.fblk, 92)
require_Equal(t, psi.lblk, 92)
checkFor(t, time.Second, 100*time.Millisecond, func() error {
fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.22.bar"))
total, fblk, lblk = psi.total, psi.fblk, psi.lblk
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, total, 2)
require_Equal(t, fblk, 92)
if fblk != 92 {
return fmt.Errorf("fblk should be 92, still %d", fblk)
}
return nil
})
}

// Make sure if we only miss by one for fblk that we still update it.
Expand All @@ -7248,10 +7269,14 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) {
fetch := func(subj string) *psi {
t.Helper()
fs.mu.RLock()
var info psi
psi, ok := fs.psim.Find([]byte(subj))
if ok && psi != nil {
info = *psi
}
fs.mu.RUnlock()
require_True(t, ok)
return psi
return &info
}

psi := fetch("foo.22.bar")
Expand All @@ -7274,10 +7299,15 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) {
require_Equal(t, ss.Last, 7)

// Now make sure that we properly updated the psim entry.
psi = fetch("foo.22.bar")
require_Equal(t, psi.total, 3)
require_Equal(t, psi.fblk, 2)
require_Equal(t, psi.lblk, 4)
checkFor(t, time.Second, 100*time.Millisecond, func() error {
psi = fetch("foo.22.bar")
require_Equal(t, psi.total, 3)
require_Equal(t, psi.lblk, 4)
if psi.fblk != 2 {
return fmt.Errorf("fblk should be 2, still %d", psi.fblk)
}
return nil
})

// Now make sure wildcard calls into also update blks.
// First remove first "foo.22.baz" which will remove first block.
Expand All @@ -7300,10 +7330,15 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) {
require_Equal(t, ss.First, 4)
require_Equal(t, ss.Last, 8)

psi = fetch("foo.22.baz")
require_Equal(t, psi.total, 3)
require_Equal(t, psi.fblk, 2)
require_Equal(t, psi.lblk, 4)
checkFor(t, time.Second, 100*time.Millisecond, func() error {
psi = fetch("foo.22.baz")
require_Equal(t, psi.total, 3)
require_Equal(t, psi.lblk, 4)
if psi.fblk != 2 {
return fmt.Errorf("fblk should be 2, still %d", psi.fblk)
}
return nil
})
}

func TestFileStoreLargeSparseMsgsDoNotLoadAfterLast(t *testing.T) {
Expand Down
Loading

0 comments on commit c9d0a12

Please sign in to comment.