Skip to content

Commit

Permalink
[FIXED] Make sure we check limits when scaling up a stream. (#4738)
Browse files Browse the repository at this point in the history
Resolves: #4732

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Nov 2, 2023
2 parents f404640 + 4961f84 commit 7d0e27a
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
6 changes: 6 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6086,6 +6086,12 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
if isReplicaChange {
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
// Check that we have the allocation available.
if err := js.jsClusteredStreamLimitsCheck(acc, newCfg); err != nil {
resp.Error = err
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
// Check if we do not have a cluster assigned, and if we do not make sure we
// try to pick one. This could happen with older streams that were assigned by
// previous servers.
Expand Down
68 changes: 68 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5813,3 +5813,71 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
cancel()
wg.Wait()
}

// https://github.com/nats-io/nats-server/issues/4732
func TestJetStreamClusterStreamLimitsOnScaleUpAndMove(t *testing.T) {
tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
`
opFrag := `
operator: %s
system_account: %s
resolver: { type: MEM }
resolver_preload = {
%s : %s
%s : %s
}
`

_, syspub := createKey(t)
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)

accKp, aExpPub := createKey(t)
accClaim := jwt.NewAccountClaims(aExpPub)
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{
DiskStorage: -1, Consumer: -1, Streams: 1}
accClaim.Limits.JetStreamTieredLimits["R3"] = jwt.JetStreamLimits{
DiskStorage: 0, Consumer: -1, Streams: 1}
accJwt := encodeClaim(t, accClaim, aExpPub)
accCreds := newUser(t, accKp)

template := tmpl + fmt.Sprintf(opFrag, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt)

c := createJetStreamCluster(t, template, "CLOUD", _EMPTY_, 3, 22020, true)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds))
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

toSend, msg := 100, bytes.Repeat([]byte("Z"), 1024)
for i := 0; i < toSend; i++ {
_, err := js.PublishAsync("foo", msg)
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Scale up should fail here since no R3 storage.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_Error(t, err, errors.New("insufficient storage resources"))
}

0 comments on commit 7d0e27a

Please sign in to comment.