diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 035c7e9d6a2..348299accb0 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -969,7 +969,7 @@ func TestJetStreamAddStreamOverlappingSubjects(t *testing.T) { // Test that any overlapping subjects will fail. expectErr(acc.addStream(&StreamConfig{Name: "foo"})) expectErr(acc.addStream(&StreamConfig{Name: "a", Subjects: []string{"baz", "bar"}})) - expectErr(acc.addStream(&StreamConfig{Name: "b", Subjects: []string{">"}})) + expectErr(acc.addStream(&StreamConfig{Name: "b", Subjects: []string{">"}, NoAck: true})) expectErr(acc.addStream(&StreamConfig{Name: "c", Subjects: []string{"baz.33"}})) expectErr(acc.addStream(&StreamConfig{Name: "d", Subjects: []string{"*.33"}})) expectErr(acc.addStream(&StreamConfig{Name: "e", Subjects: []string{"*.>"}})) @@ -984,7 +984,7 @@ func TestJetStreamAddStreamOverlapWithJSAPISubjects(t *testing.T) { expectErr := func(_ *stream, err error) { t.Helper() - if err == nil || !strings.Contains(err.Error(), "subjects overlap") { + if err == nil || !strings.Contains(err.Error(), "subjects that overlap with jetstream api") { t.Fatalf("Expected error but got none") } } @@ -23621,25 +23621,40 @@ func TestJetStreamAuditStreams(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() + jsOverlap := errors.New("subjects that overlap with jetstream api require no-ack to be true") + sysOverlap := errors.New("subjects that overlap with system api require no-ack to be true") + _, err := js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"$JS.>"}, }) - require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api"))) + require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap)) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"$JS.API.>"}, + }) + require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap)) _, err = js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"$JSC.>"}, }) - require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api"))) + require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap)) _, err = js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"$SYS.>"}, }) - require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with system api"))) + require_Error(t, err, NewJSStreamInvalidConfigError(sysOverlap)) - // These should be ok if no pub ack. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{">"}, + }) + require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires no-ack to be true"))) + + // These should all be ok if no pub ack. _, err = js.AddStream(&nats.StreamConfig{ Name: "TEST1", Subjects: []string{"$JS.>"}, @@ -23660,4 +23675,61 @@ func TestJetStreamAuditStreams(t *testing.T) { NoAck: true, }) require_NoError(t, err) + + // Since prior behavior did allow $JS.EVENT to be captured without no-ack, these might break + // on a server upgrade so make sure they still work ok without --no-ack. + + // To avoid overlap error. + err = js.DeleteStream("TEST1") + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST4", + Subjects: []string{"$JS.EVENT.>"}, + }) + require_NoError(t, err) + + // Also allow $SYS.ACCOUNT to be captured without no-ack, these also might break + // on a server upgrade so make sure they still work ok without --no-ack. + + // To avoid overlap error. + err = js.DeleteStream("TEST3") + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST5", + Subjects: []string{"$SYS.ACCOUNT.>"}, + }) + require_NoError(t, err) + + // We will test handling of ">" on a cluster here. + // Specific test for capturing everything which will require both no-ack and replicas of 1. + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{">"}, + }) + require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires no-ack to be true"))) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{">"}, + Replicas: 3, + NoAck: true, + }) + require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires replicas of 1"))) + + // Ths should work ok. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{">"}, + Replicas: 1, + NoAck: true, + }) + require_NoError(t, err) } diff --git a/server/stream.go b/server/stream.go index 5fcb0dac34f..690c87495cc 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1492,7 +1492,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi } // Check for literal duplication of subject interest in config - // and no overlap with any JS API subject space + // and no overlap with any JS or SYS API subject space. dset := make(map[string]struct{}, len(cfg.Subjects)) for _, subj := range cfg.Subjects { // Make sure the subject is valid. Check this first. @@ -1502,13 +1502,28 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi if _, ok := dset[subj]; ok { return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicate subjects detected")) } + // Check for trying to capture everything. + if subj == fwcs { + if !cfg.NoAck { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("capturing all subjects requires no-ack to be true")) + } + // Capturing everything also will require R1. + if cfg.Replicas != 1 { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("capturing all subjects requires replicas of 1")) + } + } // Also check to make sure we do not overlap with our $JS API subjects. if !cfg.NoAck && (subjectIsSubsetMatch(subj, "$JS.>") || subjectIsSubsetMatch(subj, "$JSC.>")) { - return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api")) + // We allow an exception for $JS.EVENT.> since these could have been created in the past. + if !subjectIsSubsetMatch(subj, "$JS.EVENT.>") { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with jetstream api require no-ack to be true")) + } } // And the $SYS subjects. if !cfg.NoAck && subjectIsSubsetMatch(subj, "$SYS.>") { - return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with system api")) + if !subjectIsSubsetMatch(subj, "$SYS.ACCOUNT.>") { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with system api require no-ack to be true")) + } } // Mark for duplicate check. dset[subj] = struct{}{}