Skip to content

Commit

Permalink
update: deprecate namespace and switch to multi-tenancy
Browse files Browse the repository at this point in the history
  • Loading branch information
kainonly committed Oct 22, 2023
1 parent fb0341f commit 0fa8ae6
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 62 deletions.
23 changes: 11 additions & 12 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,21 @@ func (x *App) Run() (err error) {
return
}
if err = x.Set(key, option); err != nil {
x.Log.Error("KeyValuePutOp Set:fail",
x.Log.Error("schedule fail",
zap.String("key", key),
zap.Any("option", option),
zap.Error(err),
)
return
}
x.Log.Debug("KeyValuePutOp SetCron:ok",
x.Log.Debug("schedule ok",
zap.String("key", key),
zap.Any("option", option),
)
break
case "KeyValueDeleteOp":
x.Remove(key)
x.Log.Debug("KeyValueDeleteOp Remove:ok",
x.Log.Debug("schedule removed",
zap.String("key", key),
)
break
Expand All @@ -93,7 +93,7 @@ func (x *App) Run() (err error) {
if err = x.Ping(); err != nil {
return
}
x.Log.Info("Service started!")
x.Log.Info("service started!")
return
}

Expand All @@ -119,7 +119,7 @@ func (x *App) Set(key string, option common.ScheduleOption) (err error) {

func (x *App) SetJob(key string, c *cron.Cron, index int, job common.ScheduleJob) (err error) {
if _, err = c.AddFunc(job.Spec, func() {
subj := fmt.Sprintf(`%s.jobs.%s`, x.V.Namespace, x.V.Node)
subj := fmt.Sprintf(`jobs.%s`, x.V.Node)
var b []byte
if b, err = msgpack.Marshal(common.Job{
Key: key,
Expand All @@ -130,14 +130,14 @@ func (x *App) SetJob(key string, c *cron.Cron, index int, job common.ScheduleJob
return
}
if err = x.Nats.Publish(subj, b); err != nil {
x.Log.Error("Publish:fail",
x.Log.Error("publish fail",
zap.String("key", key),
zap.Int("index", index),
zap.Error(err),
)
return
}
x.Log.Debug("Publish:ok",
x.Log.Debug("publish ok",
zap.String("key", key),
zap.Int("index", index),
zap.String("mode", job.Mode),
Expand All @@ -163,8 +163,8 @@ func (x *App) GetState(key string) []cron.Entry {
}

func (x *App) State() (err error) {
subj := fmt.Sprintf(`%s.schedules.%s`, x.V.Namespace, x.V.Node)
queue := fmt.Sprintf(`%s:schedules:%s`, x.V.Namespace, x.V.Node)
subj := fmt.Sprintf(`schedules.%s`, x.V.Node)
queue := fmt.Sprintf(`SCHEDULE_%s`, x.V.Node)
if _, err = x.Nats.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
key := string(msg.Data)
var states []common.ScheduleState
Expand All @@ -182,15 +182,14 @@ func (x *App) State() (err error) {
}); err != nil {
return
}
x.Log.Debug("State:ok",
x.Log.Debug("state ok",
zap.String("subj", subj),
)
return
}

func (x *App) Ping() (err error) {
subj := fmt.Sprintf(`%s.schedules`, x.V.Namespace)
if _, err = x.Nats.Subscribe(subj, func(msg *nats.Msg) {
if _, err = x.Nats.Subscribe("schedules", func(msg *nats.Msg) {
if string(msg.Data) == x.V.Node {
msg.Respond([]byte("ok"))
}
Expand Down
3 changes: 2 additions & 1 deletion bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func UseJetStream(nc *nats.Conn) (nats.JetStreamContext, error) {

func UseKeyValue(values *common.Values, js nats.JetStreamContext) (nats.KeyValue, error) {
return js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: fmt.Sprintf(`%s_schedules_%s`, values.Namespace, values.Node),
Bucket: fmt.Sprintf(`schedules_%s`, values.Node),
Description: "Schedule message event publishing node",
})
}
44 changes: 9 additions & 35 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,55 +10,29 @@ import (
)

type Client struct {
Namespace string
Node string
Nats *nats.Conn
JetStream nats.JetStreamContext
KeyValue nats.KeyValue
}

type Option func(x *Client)

func SetNamespace(v string) Option {
return func(x *Client) {
x.Namespace = v
}
}

func SetNode(v string) Option {
return func(x *Client) {
x.Node = v
}
}

func SetNats(v *nats.Conn) Option {
return func(x *Client) {
x.Nats = v
}
}

func SetJetStream(v nats.JetStreamContext) Option {
return func(x *Client) {
x.JetStream = v
}
}

func New(options ...Option) (x *Client, err error) {
x = new(Client)
for _, apply := range options {
apply(x)
func New(node string, nc *nats.Conn) (x *Client, err error) {
x = &Client{Node: node, Nats: nc}
if x.JetStream, err = nc.JetStream(
nats.PublishAsyncMaxPending(256),
); err != nil {
return
}
bucket := fmt.Sprintf(`%s_schedules_%s`, x.Namespace, x.Node)
bucket := fmt.Sprintf(`schedules_%s`, x.Node)
if x.KeyValue, err = x.JetStream.KeyValue(bucket); err != nil {
return
}
return
}

func (x *Client) Ping() (result bool, err error) {
subj := fmt.Sprintf(`%s.schedules`, x.Namespace)
var msg *nats.Msg
if msg, err = x.Nats.Request(subj, []byte(x.Node), time.Second*5); err != nil {
if msg, err = x.Nats.Request("schedules", []byte(x.Node), time.Second*5); err != nil {
return
}
result = string(msg.Data) == "ok"
Expand All @@ -84,7 +58,7 @@ func (x *Client) Get(key string) (option common.ScheduleOption, err error) {
return
}
var msg *nats.Msg
subj := fmt.Sprintf(`%s.schedules.%s`, x.Namespace, x.Node)
subj := fmt.Sprintf(`schedules.%s`, x.Node)
if msg, err = x.Nats.Request(subj, []byte(key), time.Second*3); err != nil {
return
}
Expand Down
20 changes: 9 additions & 11 deletions client/client_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package client_test

import (
"context"
"fmt"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
"github.com/stretchr/testify/assert"
Expand All @@ -14,23 +14,18 @@ import (
var x *client.Client

func TestMain(m *testing.M) {
ctx := context.TODO()
nc, js, err := UseNats(ctx)
node := os.Getenv("NODE")
nc, err := UseNats(node)
if err != nil {
panic(err)
}
if x, err = client.New(
client.SetNamespace("example"),
client.SetNats(nc),
client.SetJetStream(js),
client.SetNode(os.Getenv("NODE")),
); err != nil {
if x, err = client.New(node, nc); err != nil {
panic(err)
}
os.Exit(m.Run())
}

func UseNats(ctx context.Context) (nc *nats.Conn, js nats.JetStreamContext, err error) {
func UseNats(node string) (nc *nats.Conn, err error) {
var auth nats.Option
var kp nkeys.KeyPair
if kp, err = nkeys.FromSeed([]byte(os.Getenv("NATS_NKEY"))); err != nil {
Expand All @@ -54,12 +49,15 @@ func UseNats(ctx context.Context) (nc *nats.Conn, js nats.JetStreamContext, err
); err != nil {
return
}
var js nats.JetStreamContext
if js, err = nc.JetStream(
nats.PublishAsyncMaxPending(256),
nats.Context(ctx),
); err != nil {
return
}
if _, err = js.CreateKeyValue(&nats.KeyValueConfig{Bucket: fmt.Sprintf(`schedules_%s`, node)}); err != nil {
return
}
return
}

Expand Down
5 changes: 2 additions & 3 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ type Inject struct {
}

type Values struct {
Namespace string `env:"NAMESPACE,required"`
Node string `env:"NODE,required"`
Nats struct {
Node string `env:"NODE,required"`
Nats struct {
Hosts []string `env:"HOSTS,required" envSeparator:","`
Nkey string `env:"NKEY,required"`
} `envPrefix:"NATS_"`
Expand Down

0 comments on commit 0fa8ae6

Please sign in to comment.