Skip to content

Commit

Permalink
Added protection for trying to capture all subjects ">" in a stream.
Browse files Browse the repository at this point in the history
We will allow if no-ack and R1.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Jun 17, 2024
1 parent a4a3b9a commit 471284c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
38 changes: 37 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23648,7 +23648,13 @@ func TestJetStreamAuditStreams(t *testing.T) {
})
require_Error(t, err, NewJSStreamInvalidConfigError(sysOverlap))

// These should be ok if no pub ack.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires no-ack to be true")))

// These should all be ok if no pub ack.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST1",
Subjects: []string{"$JS.>"},
Expand Down Expand Up @@ -23683,4 +23689,34 @@ func TestJetStreamAuditStreams(t *testing.T) {
NoAck: true,
})
require_NoError(t, err)

// Specific test for capturing everything which will require both no-ack and replicas of 1.
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires no-ack to be true")))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
Replicas: 3,
NoAck: true,
})
require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires replicas of 1")))

// Ths should work ok.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
Replicas: 1,
NoAck: true,
})
require_NoError(t, err)
}
10 changes: 10 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,16 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if _, ok := dset[subj]; ok {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicate subjects detected"))
}
// Check for trying to capture everything.
if subj == fwcs {
if !cfg.NoAck {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("capturing all subjects requires no-ack to be true"))
}
// Capturing everything also will require R1.
if cfg.Replicas != 1 {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("capturing all subjects requires replicas of 1"))
}
}
// Also check to make sure we do not overlap with our $JS API subjects.
if !cfg.NoAck && (subjectIsSubsetMatch(subj, "$JS.>") || subjectIsSubsetMatch(subj, "$JSC.>")) {
// We allow an exception for $JS.EVENT.> since these could have been created in the past.
Expand Down

0 comments on commit 471284c

Please sign in to comment.