Skip to content

Commit

Permalink
Allow for audit exception for $JS.EVENT.> and $SYS.ACCOUNT.> whic…
Browse files Browse the repository at this point in the history
…h may already exist. (#5556)

Signed-off-by: Derek Collison <[email protected]>

---------

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Jun 18, 2024
1 parent 3b1b38e commit fb839a3
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 9 deletions.
84 changes: 78 additions & 6 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ func TestJetStreamAddStreamOverlappingSubjects(t *testing.T) {
// Test that any overlapping subjects will fail.
expectErr(acc.addStream(&StreamConfig{Name: "foo"}))
expectErr(acc.addStream(&StreamConfig{Name: "a", Subjects: []string{"baz", "bar"}}))
expectErr(acc.addStream(&StreamConfig{Name: "b", Subjects: []string{">"}}))
expectErr(acc.addStream(&StreamConfig{Name: "b", Subjects: []string{">"}, NoAck: true}))
expectErr(acc.addStream(&StreamConfig{Name: "c", Subjects: []string{"baz.33"}}))
expectErr(acc.addStream(&StreamConfig{Name: "d", Subjects: []string{"*.33"}}))
expectErr(acc.addStream(&StreamConfig{Name: "e", Subjects: []string{"*.>"}}))
Expand All @@ -984,7 +984,7 @@ func TestJetStreamAddStreamOverlapWithJSAPISubjects(t *testing.T) {

expectErr := func(_ *stream, err error) {
t.Helper()
if err == nil || !strings.Contains(err.Error(), "subjects overlap") {
if err == nil || !strings.Contains(err.Error(), "subjects that overlap with jetstream api") {
t.Fatalf("Expected error but got none")
}
}
Expand Down Expand Up @@ -23621,25 +23621,40 @@ func TestJetStreamAuditStreams(t *testing.T) {
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsOverlap := errors.New("subjects that overlap with jetstream api require no-ack to be true")
sysOverlap := errors.New("subjects that overlap with system api require no-ack to be true")

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$JS.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api")))
require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$JS.API.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$JSC.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api")))
require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$SYS.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with system api")))
require_Error(t, err, NewJSStreamInvalidConfigError(sysOverlap))

// These should be ok if no pub ack.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires no-ack to be true")))

// These should all be ok if no pub ack.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST1",
Subjects: []string{"$JS.>"},
Expand All @@ -23660,4 +23675,61 @@ func TestJetStreamAuditStreams(t *testing.T) {
NoAck: true,
})
require_NoError(t, err)

// Since prior behavior did allow $JS.EVENT to be captured without no-ack, these might break
// on a server upgrade so make sure they still work ok without --no-ack.

// To avoid overlap error.
err = js.DeleteStream("TEST1")
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST4",
Subjects: []string{"$JS.EVENT.>"},
})
require_NoError(t, err)

// Also allow $SYS.ACCOUNT to be captured without no-ack, these also might break
// on a server upgrade so make sure they still work ok without --no-ack.

// To avoid overlap error.
err = js.DeleteStream("TEST3")
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST5",
Subjects: []string{"$SYS.ACCOUNT.>"},
})
require_NoError(t, err)

// We will test handling of ">" on a cluster here.
// Specific test for capturing everything which will require both no-ack and replicas of 1.
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires no-ack to be true")))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
Replicas: 3,
NoAck: true,
})
require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires replicas of 1")))

// Ths should work ok.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
Replicas: 1,
NoAck: true,
})
require_NoError(t, err)
}
21 changes: 18 additions & 3 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
}

// Check for literal duplication of subject interest in config
// and no overlap with any JS API subject space
// and no overlap with any JS or SYS API subject space.
dset := make(map[string]struct{}, len(cfg.Subjects))
for _, subj := range cfg.Subjects {
// Make sure the subject is valid. Check this first.
Expand All @@ -1502,13 +1502,28 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if _, ok := dset[subj]; ok {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicate subjects detected"))
}
// Check for trying to capture everything.
if subj == fwcs {
if !cfg.NoAck {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("capturing all subjects requires no-ack to be true"))
}
// Capturing everything also will require R1.
if cfg.Replicas != 1 {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("capturing all subjects requires replicas of 1"))
}
}
// Also check to make sure we do not overlap with our $JS API subjects.
if !cfg.NoAck && (subjectIsSubsetMatch(subj, "$JS.>") || subjectIsSubsetMatch(subj, "$JSC.>")) {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api"))
// We allow an exception for $JS.EVENT.> since these could have been created in the past.
if !subjectIsSubsetMatch(subj, "$JS.EVENT.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with jetstream api require no-ack to be true"))
}
}
// And the $SYS subjects.
if !cfg.NoAck && subjectIsSubsetMatch(subj, "$SYS.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with system api"))
if !subjectIsSubsetMatch(subj, "$SYS.ACCOUNT.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with system api require no-ack to be true"))
}
}
// Mark for duplicate check.
dset[subj] = struct{}{}
Expand Down

0 comments on commit fb839a3

Please sign in to comment.