Skip to content

Commit

Permalink
events/nats: Publish() to accept rollup bool
Browse files Browse the repository at this point in the history
This bool value will enable the caller to purge any previous messages on
the subject. This is required for subjects where only a single message
should be present on the JS at any given moment. A new message publish
should overwrite the previous - if any.
  • Loading branch information
joelrebel committed Jul 2, 2024
1 parent 2995999 commit 11e76e1
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 6 deletions.
4 changes: 3 additions & 1 deletion events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ type Stream interface {
Open() error

// Publish publishes the message to the message broker.
Publish(ctx context.Context, subject string, msg []byte) error
//
// rollupSubject when set to true will cause any previous messages with the same subject to be overwritten by this new msg.
Publish(ctx context.Context, subject string, msg []byte, rollupSubject bool) error

// Subscribe subscribes to one or more subjects on the stream returning a message channel for subscribers to read from.
Subscribe(ctx context.Context) (MsgCh, error)
Expand Down
15 changes: 11 additions & 4 deletions events/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,13 @@ func (n *NatsJetstream) consumerConfigIsEqual(consumerInfo *nats.ConsumerInfo) b
}
}

// Publish publishes an event onto the NATS Jetstream. The caller is responsible for message
// addressing and data serialization. NOTE: The subject passed here will be prepended with any
// configured PublisherSubjectPrefix.
func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data []byte) error {
// Publish publishes an event onto the NATS Jetstream.
// The caller is responsible for message addressing and data serialization.
//
// rollupSubject when set to true will cause any previous messages with the same subject to be overwritten by this new msg.
//
// NOTE: The subject passed here will be prepended with the configured PublisherSubjectPrefix.
func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data []byte, rollupSubject bool) error {
if n.jsctx == nil {
return errors.Wrap(ErrNatsJetstreamAddConsumer, "Jetstream context is not setup")
}
Expand All @@ -295,6 +298,10 @@ func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data
// inject otel trace context
injectOtelTraceContext(ctx, msg)

// https://docs.nats.io/nats-concepts/jetstream/streams#allowrollup
if rollupSubject {
msg.Header.Add("Nats-Rollup", "sub")
}
_, err := n.jsctx.PublishMsg(msg, options...)
return err
}
Expand Down
50 changes: 49 additions & 1 deletion events/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestPublishAndSubscribe(t *testing.T) {
require.NoError(t, err)

payload := []byte("test data")
require.NoError(t, njs.Publish(context.TODO(), "test", payload))
require.NoError(t, njs.Publish(context.TODO(), "test", payload, false))

msgs, err := njs.PullMsg(context.TODO(), 1)
require.NoError(t, err)
Expand All @@ -95,6 +95,54 @@ func TestPublishAndSubscribe(t *testing.T) {
require.ErrorIs(t, err, nats.ErrTimeout)
}

func TestPublishAndSubscribe_WithRollup(t *testing.T) {
jsSrv := natsTest.StartJetStreamServer(t)
defer natsTest.ShutdownJetStream(t, jsSrv)

jsConn, _ := natsTest.JetStreamContext(t, jsSrv)
njs := NewJetstreamFromConn(jsConn)
defer njs.Close()

njs.parameters = &NatsOptions{
AppName: "TestPublishAndSubscribe",
Stream: &NatsStreamOptions{
Name: "test_stream",
Subjects: []string{
"pre.test",
},
Retention: "workQueue",
},
Consumer: &NatsConsumerOptions{
Name: "test_consumer",
Pull: true,
SubscribeSubjects: []string{
"pre.test",
},
FilterSubject: "pre.test",
},
PublisherSubjectPrefix: "pre",
}
require.NoError(t, njs.addStream())
require.NoError(t, njs.addConsumer())

_, err := njs.Subscribe(context.TODO())
require.NoError(t, err)

payload := []byte("test data")
require.NoError(t, njs.Publish(context.TODO(), "test", payload, true))
payload2 := []byte("rollup")
require.NoError(t, njs.Publish(context.TODO(), "test", payload2, true))

msgs, err := njs.PullMsg(context.TODO(), 1)
require.NoError(t, err)
require.Equal(t, 1, len(msgs))
require.Equal(t, payload2, msgs[0].Data())

msgs, err = njs.PullMsg(context.TODO(), 1)
require.Error(t, err)
require.ErrorIs(t, err, nats.ErrTimeout)
}

func Test_addConsumer(t *testing.T) {
jsSrv := natsTest.StartJetStreamServer(t)
defer natsTest.ShutdownJetStream(t, jsSrv)
Expand Down

0 comments on commit 11e76e1

Please sign in to comment.