Skip to content

Commit

Permalink
update: support queue tenant management
Browse files Browse the repository at this point in the history
  • Loading branch information
kainonly committed Oct 27, 2023
1 parent 895b1b6 commit c6760e9
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 73 deletions.
1 change: 0 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func (x *API) Routes(h *server.Hertz) (err error) {
_queues := h.Group("queues", m...)
{
_queues.POST("sync", x.Queues.Sync)
_queues.POST("destroy", x.Queues.Destroy)
_queues.GET(":id/info", x.Queues.Info)
_queues.POST("publish", x.Queues.Publish)
}
Expand Down
9 changes: 9 additions & 0 deletions api/projects/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ type Service struct {
ClustersX *clusters.Service
}

func (x *Service) Get(ctx context.Context, id primitive.ObjectID) (data model.Project, err error) {
if err = x.Db.Collection("projects").
FindOne(ctx, bson.M{"_id": id}).
Decode(&data); err != nil {
return
}
return
}

func (x *Service) GetTenants(ctx context.Context, id primitive.ObjectID) (result M, err error) {
var project model.Project
if err = x.Db.Collection("projects").
Expand Down
24 changes: 3 additions & 21 deletions api/queues/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,6 @@ func (x *Controller) Sync(ctx context.Context, c *app.RequestContext) {
c.Status(204)
}

type DestroyDto struct {
Ids []primitive.ObjectID `json:"ids" vd:"gt=0,dive,mongodb"`
}

func (x *Controller) Destroy(ctx context.Context, c *app.RequestContext) {
var dto DestroyDto
if err := c.BindAndValidate(&dto); err != nil {
c.Error(err)
return
}

if err := x.QueuesX.Destroy(ctx, dto.Ids); err != nil {
c.Error(err)
return
}

c.Status(204)
}

type StateDto struct {
Id string `path:"id" vd:"mongodb"`
}
Expand All @@ -70,8 +51,9 @@ func (x *Controller) Info(ctx context.Context, c *app.RequestContext) {
}

type PublishDto struct {
Subject string `json:"subject" vd:"required"`
Payload M `json:"payload" vd:"gt=0"`
Project primitive.ObjectID `json:"project" vd:"required"`
Subject string `json:"subject" vd:"required"`
Payload M `json:"payload" vd:"gt=0"`
}

func (x *Controller) Publish(ctx context.Context, c *app.RequestContext) {
Expand Down
178 changes: 131 additions & 47 deletions api/queues/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,104 +2,165 @@ package queues

import (
"context"
"fmt"
"github.com/bytedance/sonic"
"github.com/cloudwego/hertz/pkg/common/hlog"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
"github.com/weplanx/go/rest"
"github.com/weplanx/server/api/projects"
"github.com/weplanx/server/common"
"github.com/weplanx/server/model"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"strings"
"sync"
"time"
)

type Service struct {
*common.Inject
ProjectsX *projects.Service
}

func (x *Service) Sync(ctx context.Context, id primitive.ObjectID) (err error) {
var data model.Queue
if err = x.Db.Collection("queues").FindOne(ctx, bson.M{
"_id": id,
}).Decode(&data); err != nil {
var clients = sync.Map{}

func (x *Service) GetClient(projectId primitive.ObjectID) (client *nats.Conn, err error) {
if i, ok := clients.Load(projectId.Hex()); ok {
client = i.(*nats.Conn)
return
}
if _, err = x.JetStream.StreamInfo(data.Name); err != nil {
if err != nats.ErrStreamNotFound {
return
}
var project model.Project
if project, err = x.ProjectsX.Get(context.TODO(), projectId); err != nil {
return
}
cfg := &nats.StreamConfig{
Name: data.Name,
Subjects: data.Subjects,
MaxMsgs: data.MaxMsgs,
MaxBytes: data.MaxBytes,
MaxAge: data.MaxAge,
Retention: nats.WorkQueuePolicy,
var seed []byte
if seed, err = x.Cipher.Decode(project.Nats.Seed); err != nil {
return
}
if data.Description != "" {
cfg.Description = data.Description
var kp nkeys.KeyPair
if kp, err = nkeys.FromSeed(seed); err != nil {
return
}
if err == nats.ErrStreamNotFound {
if _, err = x.JetStream.AddStream(cfg, nats.Context(ctx)); err != nil {
return
}
} else {
if _, err = x.JetStream.UpdateStream(cfg, nats.Context(ctx)); err != nil {
return
}
defer kp.Wipe()
var pub string
if pub, err = kp.PublicKey(); err != nil {
return
}
if !nkeys.IsValidPublicUserKey(pub) {
return nil, fmt.Errorf("nkey fail")
}
if client, err = nats.Connect(
strings.Join(x.V.Nats.Hosts, ","),
nats.MaxReconnects(-1),
nats.Nkey(pub, func(nonce []byte) ([]byte, error) {
sig, _ := kp.Sign(nonce)
return sig, nil
}),
); err != nil {
return
}
clients.Store(projectId.Hex(), client)
return
}

func (x *Service) Destroy(ctx context.Context, ids []primitive.ObjectID) (err error) {
var cursor *mongo.Cursor
if cursor, err = x.Db.Collection("queues").Find(ctx, bson.M{
"_id": bson.M{"$in": ids},
}); err != nil {
func (x *Service) GetJetStream(ctx context.Context, projectId primitive.ObjectID) (js nats.JetStreamContext, err error) {
var nc *nats.Conn
if nc, err = x.GetClient(projectId); err != nil {
return
}
if js, err = nc.JetStream(
nats.PublishAsyncMaxPending(256),
nats.Context(ctx),
); err != nil {
return
}
for cursor.Next(ctx) {
var data model.Queue
if err = cursor.Decode(&data); err != nil {
return
}

func (x *Service) Sync(ctx context.Context, id primitive.ObjectID) (err error) {
var queue model.Queue
if err = x.Db.Collection("queues").
FindOne(ctx, bson.M{"_id": id}).
Decode(&queue); err != nil {
return
}

var js nats.JetStreamContext
if js, err = x.GetJetStream(ctx, queue.Project); err != nil {
return
}

if _, err = js.StreamInfo(queue.ID.Hex()); err != nil {
if err != nats.ErrStreamNotFound {
return
}

if _, err = x.JetStream.StreamInfo(data.Name); err != nil {
if err != nats.ErrStreamNotFound {
return
} else {
return nil
}
}
cfg := &nats.StreamConfig{
Name: queue.ID.Hex(),
Description: queue.Name,
Subjects: queue.Subjects,
MaxMsgs: queue.MaxMsgs,
MaxBytes: queue.MaxBytes,
MaxAge: queue.MaxAge,
Retention: nats.WorkQueuePolicy,
}
if err == nats.ErrStreamNotFound {
if _, err = js.AddStream(cfg, nats.Context(ctx)); err != nil {
return
}
if err = x.JetStream.DeleteStream(data.Name); err != nil {
} else {
if _, err = js.UpdateStream(cfg, nats.Context(ctx)); err != nil {
return
}
}
return
}

func (x *Service) Info(ctx context.Context, id primitive.ObjectID) (r *nats.StreamInfo, err error) {
var data model.Queue
var queue model.Queue
if err = x.Db.Collection("queues").
FindOne(ctx, bson.M{"_id": id}).
Decode(&data); err != nil {
Decode(&queue); err != nil {
return
}
if r, err = x.JetStream.StreamInfo(data.Name, nats.Context(ctx)); err != nil {

var js nats.JetStreamContext
if js, err = x.GetJetStream(ctx, queue.Project); err != nil {
return
}

if r, err = js.StreamInfo(queue.ID.Hex(), nats.Context(ctx)); err != nil {
return
}
r.Cluster = nil
return
}

func (x *Service) Publish(ctx context.Context, dto PublishDto) (r interface{}, err error) {
var js nats.JetStreamContext
if js, err = x.GetJetStream(ctx, dto.Project); err != nil {
return
}
var payload []byte
if payload, err = sonic.Marshal(dto.Payload); err != nil {
return
}
if r, err = x.JetStream.Publish(dto.Subject, payload, nats.Context(ctx)); err != nil {
if r, err = js.Publish(dto.Subject, payload, nats.Context(ctx)); err != nil {
return
}
return
}

func (x *Service) Destroy(ctx context.Context, js nats.JetStreamContext, name string) (err error) {
if _, err = js.StreamInfo(name, nats.Context(ctx)); err != nil {
if err != nats.ErrStreamNotFound {
return
} else {
return nil
}
}
if err = js.DeleteStream(name, nats.Context(ctx)); err != nil {
return
}
return
Expand Down Expand Up @@ -127,6 +188,29 @@ func (x *Service) Event() (err error) {
hlog.Error(err)
}
break
case rest.ActionDelete:
projectId, _ := primitive.ObjectIDFromHex(dto.Data.(M)["project"].(string))
var js nats.JetStreamContext
if js, err = x.GetJetStream(ctx, projectId); err != nil {
hlog.Error(err)
}
if err = x.Destroy(ctx, js, dto.Data.(M)["_id"].(string)); err != nil {
hlog.Error(err)
}
break
case rest.ActionBulkDelete:
data := dto.Data.([]interface{})
projectId, _ := primitive.ObjectIDFromHex(data[0].(M)["project"].(string))
var js nats.JetStreamContext
if js, err = x.GetJetStream(ctx, projectId); err != nil {
hlog.Error(err)
}
for _, v := range data {
if err = x.Destroy(ctx, js, v.(M)["_id"].(string)); err != nil {
hlog.Error(err)
}
}
break
}
}); err != nil {
return
Expand Down
3 changes: 2 additions & 1 deletion bootstrap/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,6 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/weplanx/collector v1.11.1 h1:9QwQ5HVTBrrpyBbLiAVy2l9lm2oNCGfdeu2GS73jHB4=
github.com/weplanx/collector v1.11.1/go.mod h1:fvLrl4sMAPs0GbpAS5BtIhlcZQU3R1sgYHP3ZxxTYzk=
github.com/weplanx/go v0.8.1 h1:maD3HfjCJ8sMzYmzPq3m5+m8A6hHCttdhpIx9zhnmFc=
github.com/weplanx/go v0.8.1/go.mod h1:nDEIyJLyq4HmAqoBqJZK5QZjTLzlJm8iUSJKAzko4x0=
github.com/weplanx/go v0.8.3 h1:trLs++QkoQIzr1t0Ei4dk/ensZ1s34uVRXHEmBUT6vQ=
github.com/weplanx/go v0.8.3/go.mod h1:nDEIyJLyq4HmAqoBqJZK5QZjTLzlJm8iUSJKAzko4x0=
github.com/weplanx/schedule v1.5.1 h1:4SI4rVWM1/FeVN8ymMvGDvoGru+qXL6aScC+N1W3Di8=
Expand Down
7 changes: 7 additions & 0 deletions model/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Endpoint struct {
Name string `bson:"name" json:"name"`
Kind string `bson:"kind" json:"kind"`
Schedule *EndpointSchedule `bson:"schedule" json:"schedule"`
Emqx *EndpointEmqx `bson:"emqx" json:"emqx"`
CreateTime time.Time `bson:"create_time" json:"create_time"`
UpdateTime time.Time `bson:"update_time" json:"update_time"`
}
Expand All @@ -22,6 +23,12 @@ type EndpointSchedule struct {
Node string `bson:"node" json:"node"`
}

type EndpointEmqx struct {
Host string `bson:"host" json:"host"`
ApiKey string `bson:"api_key" json:"api_key"`
SecretKey string `bson:"secret_key" json:"secret_key"`
}

func SetEndpoints(ctx context.Context, db *mongo.Database) (err error) {
var ns []string
if ns, err = db.ListCollectionNames(ctx, bson.M{"name": "endpoints"}); err != nil {
Expand Down
10 changes: 9 additions & 1 deletion model/endpoint.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,15 @@
"bsonType": "string"
},
"schedule": {
"bsonType": "object"
"bsonType": "object",
"required": [
"node"
],
"properties": {
"node": {
"bsonType": "string"
}
}
},
"create_time": {
"bsonType": "date"
Expand Down

0 comments on commit c6760e9

Please sign in to comment.