From 11e76e1d5a143823913351a9d24f817d92dcd4ee Mon Sep 17 00:00:00 2001 From: Joel Rebello Date: Mon, 24 Jun 2024 18:03:31 +0200 Subject: [PATCH] events/nats: Publish() to accept rollup bool This bool value will enable the caller to purge any previous messages on the subject. This is required for subjects where only a single message should be present on the JS at any given moment. A new message publish should overwrite the previous - if any. --- events/events.go | 4 +++- events/nats.go | 15 ++++++++++---- events/nats_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/events/events.go b/events/events.go index c42e459..d89b44c 100644 --- a/events/events.go +++ b/events/events.go @@ -39,7 +39,9 @@ type Stream interface { Open() error // Publish publishes the message to the message broker. - Publish(ctx context.Context, subject string, msg []byte) error + // + // rollupSubject when set to true will cause any previous messages with the same subject to be overwritten by this new msg. + Publish(ctx context.Context, subject string, msg []byte, rollupSubject bool) error // Subscribe subscribes to one or more subjects on the stream returning a message channel for subscribers to read from. Subscribe(ctx context.Context) (MsgCh, error) diff --git a/events/nats.go b/events/nats.go index fdbfc58..1e076cf 100644 --- a/events/nats.go +++ b/events/nats.go @@ -274,10 +274,13 @@ func (n *NatsJetstream) consumerConfigIsEqual(consumerInfo *nats.ConsumerInfo) b } } -// Publish publishes an event onto the NATS Jetstream. The caller is responsible for message -// addressing and data serialization. NOTE: The subject passed here will be prepended with any -// configured PublisherSubjectPrefix. -func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data []byte) error { +// Publish publishes an event onto the NATS Jetstream. +// The caller is responsible for message addressing and data serialization. +// +// rollupSubject when set to true will cause any previous messages with the same subject to be overwritten by this new msg. +// +// NOTE: The subject passed here will be prepended with the configured PublisherSubjectPrefix. +func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data []byte, rollupSubject bool) error { if n.jsctx == nil { return errors.Wrap(ErrNatsJetstreamAddConsumer, "Jetstream context is not setup") } @@ -295,6 +298,10 @@ func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data // inject otel trace context injectOtelTraceContext(ctx, msg) + // https://docs.nats.io/nats-concepts/jetstream/streams#allowrollup + if rollupSubject { + msg.Header.Add("Nats-Rollup", "sub") + } _, err := n.jsctx.PublishMsg(msg, options...) return err } diff --git a/events/nats_test.go b/events/nats_test.go index c06f286..b984049 100644 --- a/events/nats_test.go +++ b/events/nats_test.go @@ -83,7 +83,7 @@ func TestPublishAndSubscribe(t *testing.T) { require.NoError(t, err) payload := []byte("test data") - require.NoError(t, njs.Publish(context.TODO(), "test", payload)) + require.NoError(t, njs.Publish(context.TODO(), "test", payload, false)) msgs, err := njs.PullMsg(context.TODO(), 1) require.NoError(t, err) @@ -95,6 +95,54 @@ func TestPublishAndSubscribe(t *testing.T) { require.ErrorIs(t, err, nats.ErrTimeout) } +func TestPublishAndSubscribe_WithRollup(t *testing.T) { + jsSrv := natsTest.StartJetStreamServer(t) + defer natsTest.ShutdownJetStream(t, jsSrv) + + jsConn, _ := natsTest.JetStreamContext(t, jsSrv) + njs := NewJetstreamFromConn(jsConn) + defer njs.Close() + + njs.parameters = &NatsOptions{ + AppName: "TestPublishAndSubscribe", + Stream: &NatsStreamOptions{ + Name: "test_stream", + Subjects: []string{ + "pre.test", + }, + Retention: "workQueue", + }, + Consumer: &NatsConsumerOptions{ + Name: "test_consumer", + Pull: true, + SubscribeSubjects: []string{ + "pre.test", + }, + FilterSubject: "pre.test", + }, + PublisherSubjectPrefix: "pre", + } + require.NoError(t, njs.addStream()) + require.NoError(t, njs.addConsumer()) + + _, err := njs.Subscribe(context.TODO()) + require.NoError(t, err) + + payload := []byte("test data") + require.NoError(t, njs.Publish(context.TODO(), "test", payload, true)) + payload2 := []byte("rollup") + require.NoError(t, njs.Publish(context.TODO(), "test", payload2, true)) + + msgs, err := njs.PullMsg(context.TODO(), 1) + require.NoError(t, err) + require.Equal(t, 1, len(msgs)) + require.Equal(t, payload2, msgs[0].Data()) + + msgs, err = njs.PullMsg(context.TODO(), 1) + require.Error(t, err) + require.ErrorIs(t, err, nats.ErrTimeout) +} + func Test_addConsumer(t *testing.T) { jsSrv := natsTest.StartJetStreamServer(t) defer natsTest.ShutdownJetStream(t, jsSrv)