From c6760e9678ca18cf038311972181c3f94b6dfa88 Mon Sep 17 00:00:00 2001 From: kain Date: Fri, 27 Oct 2023 20:24:00 +0800 Subject: [PATCH] update: support queue tenant management --- api/api.go | 1 - api/projects/service.go | 9 ++ api/queues/controller.go | 24 +----- api/queues/service.go | 178 ++++++++++++++++++++++++++++----------- bootstrap/wire_gen.go | 3 +- go.sum | 2 - model/endpoint.go | 7 ++ model/endpoint.json | 10 ++- 8 files changed, 161 insertions(+), 73 deletions(-) diff --git a/api/api.go b/api/api.go index 4b6e2831..ebb14a1e 100644 --- a/api/api.go +++ b/api/api.go @@ -181,7 +181,6 @@ func (x *API) Routes(h *server.Hertz) (err error) { _queues := h.Group("queues", m...) { _queues.POST("sync", x.Queues.Sync) - _queues.POST("destroy", x.Queues.Destroy) _queues.GET(":id/info", x.Queues.Info) _queues.POST("publish", x.Queues.Publish) } diff --git a/api/projects/service.go b/api/projects/service.go index ffaf5d83..0f9e6b5b 100644 --- a/api/projects/service.go +++ b/api/projects/service.go @@ -21,6 +21,15 @@ type Service struct { ClustersX *clusters.Service } +func (x *Service) Get(ctx context.Context, id primitive.ObjectID) (data model.Project, err error) { + if err = x.Db.Collection("projects"). + FindOne(ctx, bson.M{"_id": id}). + Decode(&data); err != nil { + return + } + return +} + func (x *Service) GetTenants(ctx context.Context, id primitive.ObjectID) (result M, err error) { var project model.Project if err = x.Db.Collection("projects"). diff --git a/api/queues/controller.go b/api/queues/controller.go index d21512ec..d44dd3b9 100644 --- a/api/queues/controller.go +++ b/api/queues/controller.go @@ -29,25 +29,6 @@ func (x *Controller) Sync(ctx context.Context, c *app.RequestContext) { c.Status(204) } -type DestroyDto struct { - Ids []primitive.ObjectID `json:"ids" vd:"gt=0,dive,mongodb"` -} - -func (x *Controller) Destroy(ctx context.Context, c *app.RequestContext) { - var dto DestroyDto - if err := c.BindAndValidate(&dto); err != nil { - c.Error(err) - return - } - - if err := x.QueuesX.Destroy(ctx, dto.Ids); err != nil { - c.Error(err) - return - } - - c.Status(204) -} - type StateDto struct { Id string `path:"id" vd:"mongodb"` } @@ -70,8 +51,9 @@ func (x *Controller) Info(ctx context.Context, c *app.RequestContext) { } type PublishDto struct { - Subject string `json:"subject" vd:"required"` - Payload M `json:"payload" vd:"gt=0"` + Project primitive.ObjectID `json:"project" vd:"required"` + Subject string `json:"subject" vd:"required"` + Payload M `json:"payload" vd:"gt=0"` } func (x *Controller) Publish(ctx context.Context, c *app.RequestContext) { diff --git a/api/queues/service.go b/api/queues/service.go index d1965c2e..f1953449 100644 --- a/api/queues/service.go +++ b/api/queues/service.go @@ -2,78 +2,115 @@ package queues import ( "context" + "fmt" "github.com/bytedance/sonic" "github.com/cloudwego/hertz/pkg/common/hlog" "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" "github.com/weplanx/go/rest" + "github.com/weplanx/server/api/projects" "github.com/weplanx/server/common" "github.com/weplanx/server/model" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/mongo" + "strings" + "sync" "time" ) type Service struct { *common.Inject + ProjectsX *projects.Service } -func (x *Service) Sync(ctx context.Context, id primitive.ObjectID) (err error) { - var data model.Queue - if err = x.Db.Collection("queues").FindOne(ctx, bson.M{ - "_id": id, - }).Decode(&data); err != nil { +var clients = sync.Map{} + +func (x *Service) GetClient(projectId primitive.ObjectID) (client *nats.Conn, err error) { + if i, ok := clients.Load(projectId.Hex()); ok { + client = i.(*nats.Conn) return } - if _, err = x.JetStream.StreamInfo(data.Name); err != nil { - if err != nats.ErrStreamNotFound { - return - } + var project model.Project + if project, err = x.ProjectsX.Get(context.TODO(), projectId); err != nil { + return } - cfg := &nats.StreamConfig{ - Name: data.Name, - Subjects: data.Subjects, - MaxMsgs: data.MaxMsgs, - MaxBytes: data.MaxBytes, - MaxAge: data.MaxAge, - Retention: nats.WorkQueuePolicy, + var seed []byte + if seed, err = x.Cipher.Decode(project.Nats.Seed); err != nil { + return } - if data.Description != "" { - cfg.Description = data.Description + var kp nkeys.KeyPair + if kp, err = nkeys.FromSeed(seed); err != nil { + return } - if err == nats.ErrStreamNotFound { - if _, err = x.JetStream.AddStream(cfg, nats.Context(ctx)); err != nil { - return - } - } else { - if _, err = x.JetStream.UpdateStream(cfg, nats.Context(ctx)); err != nil { - return - } + defer kp.Wipe() + var pub string + if pub, err = kp.PublicKey(); err != nil { + return + } + if !nkeys.IsValidPublicUserKey(pub) { + return nil, fmt.Errorf("nkey fail") } + if client, err = nats.Connect( + strings.Join(x.V.Nats.Hosts, ","), + nats.MaxReconnects(-1), + nats.Nkey(pub, func(nonce []byte) ([]byte, error) { + sig, _ := kp.Sign(nonce) + return sig, nil + }), + ); err != nil { + return + } + clients.Store(projectId.Hex(), client) return } -func (x *Service) Destroy(ctx context.Context, ids []primitive.ObjectID) (err error) { - var cursor *mongo.Cursor - if cursor, err = x.Db.Collection("queues").Find(ctx, bson.M{ - "_id": bson.M{"$in": ids}, - }); err != nil { +func (x *Service) GetJetStream(ctx context.Context, projectId primitive.ObjectID) (js nats.JetStreamContext, err error) { + var nc *nats.Conn + if nc, err = x.GetClient(projectId); err != nil { + return + } + if js, err = nc.JetStream( + nats.PublishAsyncMaxPending(256), + nats.Context(ctx), + ); err != nil { return } - for cursor.Next(ctx) { - var data model.Queue - if err = cursor.Decode(&data); err != nil { + return +} + +func (x *Service) Sync(ctx context.Context, id primitive.ObjectID) (err error) { + var queue model.Queue + if err = x.Db.Collection("queues"). + FindOne(ctx, bson.M{"_id": id}). + Decode(&queue); err != nil { + return + } + + var js nats.JetStreamContext + if js, err = x.GetJetStream(ctx, queue.Project); err != nil { + return + } + + if _, err = js.StreamInfo(queue.ID.Hex()); err != nil { + if err != nats.ErrStreamNotFound { return } - - if _, err = x.JetStream.StreamInfo(data.Name); err != nil { - if err != nats.ErrStreamNotFound { - return - } else { - return nil - } + } + cfg := &nats.StreamConfig{ + Name: queue.ID.Hex(), + Description: queue.Name, + Subjects: queue.Subjects, + MaxMsgs: queue.MaxMsgs, + MaxBytes: queue.MaxBytes, + MaxAge: queue.MaxAge, + Retention: nats.WorkQueuePolicy, + } + if err == nats.ErrStreamNotFound { + if _, err = js.AddStream(cfg, nats.Context(ctx)); err != nil { + return } - if err = x.JetStream.DeleteStream(data.Name); err != nil { + } else { + if _, err = js.UpdateStream(cfg, nats.Context(ctx)); err != nil { return } } @@ -81,13 +118,19 @@ func (x *Service) Destroy(ctx context.Context, ids []primitive.ObjectID) (err er } func (x *Service) Info(ctx context.Context, id primitive.ObjectID) (r *nats.StreamInfo, err error) { - var data model.Queue + var queue model.Queue if err = x.Db.Collection("queues"). FindOne(ctx, bson.M{"_id": id}). - Decode(&data); err != nil { + Decode(&queue); err != nil { return } - if r, err = x.JetStream.StreamInfo(data.Name, nats.Context(ctx)); err != nil { + + var js nats.JetStreamContext + if js, err = x.GetJetStream(ctx, queue.Project); err != nil { + return + } + + if r, err = js.StreamInfo(queue.ID.Hex(), nats.Context(ctx)); err != nil { return } r.Cluster = nil @@ -95,11 +138,29 @@ func (x *Service) Info(ctx context.Context, id primitive.ObjectID) (r *nats.Stre } func (x *Service) Publish(ctx context.Context, dto PublishDto) (r interface{}, err error) { + var js nats.JetStreamContext + if js, err = x.GetJetStream(ctx, dto.Project); err != nil { + return + } var payload []byte if payload, err = sonic.Marshal(dto.Payload); err != nil { return } - if r, err = x.JetStream.Publish(dto.Subject, payload, nats.Context(ctx)); err != nil { + if r, err = js.Publish(dto.Subject, payload, nats.Context(ctx)); err != nil { + return + } + return +} + +func (x *Service) Destroy(ctx context.Context, js nats.JetStreamContext, name string) (err error) { + if _, err = js.StreamInfo(name, nats.Context(ctx)); err != nil { + if err != nats.ErrStreamNotFound { + return + } else { + return nil + } + } + if err = js.DeleteStream(name, nats.Context(ctx)); err != nil { return } return @@ -127,6 +188,29 @@ func (x *Service) Event() (err error) { hlog.Error(err) } break + case rest.ActionDelete: + projectId, _ := primitive.ObjectIDFromHex(dto.Data.(M)["project"].(string)) + var js nats.JetStreamContext + if js, err = x.GetJetStream(ctx, projectId); err != nil { + hlog.Error(err) + } + if err = x.Destroy(ctx, js, dto.Data.(M)["_id"].(string)); err != nil { + hlog.Error(err) + } + break + case rest.ActionBulkDelete: + data := dto.Data.([]interface{}) + projectId, _ := primitive.ObjectIDFromHex(data[0].(M)["project"].(string)) + var js nats.JetStreamContext + if js, err = x.GetJetStream(ctx, projectId); err != nil { + hlog.Error(err) + } + for _, v := range data { + if err = x.Destroy(ctx, js, v.(M)["_id"].(string)); err != nil { + hlog.Error(err) + } + } + break } }); err != nil { return diff --git a/bootstrap/wire_gen.go b/bootstrap/wire_gen.go index ac789370..6339abb8 100644 --- a/bootstrap/wire_gen.go +++ b/bootstrap/wire_gen.go @@ -153,7 +153,8 @@ func NewAPI(values2 *common.Values) (*api.API, error) { WorkflowsX: workflowsService, } queuesService := &queues.Service{ - Inject: inject, + Inject: inject, + ProjectsX: projectsService, } queuesController := &queues.Controller{ QueuesX: queuesService, diff --git a/go.sum b/go.sum index b50abbda..93c05b55 100644 --- a/go.sum +++ b/go.sum @@ -381,8 +381,6 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/weplanx/collector v1.11.1 h1:9QwQ5HVTBrrpyBbLiAVy2l9lm2oNCGfdeu2GS73jHB4= github.com/weplanx/collector v1.11.1/go.mod h1:fvLrl4sMAPs0GbpAS5BtIhlcZQU3R1sgYHP3ZxxTYzk= -github.com/weplanx/go v0.8.1 h1:maD3HfjCJ8sMzYmzPq3m5+m8A6hHCttdhpIx9zhnmFc= -github.com/weplanx/go v0.8.1/go.mod h1:nDEIyJLyq4HmAqoBqJZK5QZjTLzlJm8iUSJKAzko4x0= github.com/weplanx/go v0.8.3 h1:trLs++QkoQIzr1t0Ei4dk/ensZ1s34uVRXHEmBUT6vQ= github.com/weplanx/go v0.8.3/go.mod h1:nDEIyJLyq4HmAqoBqJZK5QZjTLzlJm8iUSJKAzko4x0= github.com/weplanx/schedule v1.5.1 h1:4SI4rVWM1/FeVN8ymMvGDvoGru+qXL6aScC+N1W3Di8= diff --git a/model/endpoint.go b/model/endpoint.go index a5ed1309..47bc924f 100644 --- a/model/endpoint.go +++ b/model/endpoint.go @@ -14,6 +14,7 @@ type Endpoint struct { Name string `bson:"name" json:"name"` Kind string `bson:"kind" json:"kind"` Schedule *EndpointSchedule `bson:"schedule" json:"schedule"` + Emqx *EndpointEmqx `bson:"emqx" json:"emqx"` CreateTime time.Time `bson:"create_time" json:"create_time"` UpdateTime time.Time `bson:"update_time" json:"update_time"` } @@ -22,6 +23,12 @@ type EndpointSchedule struct { Node string `bson:"node" json:"node"` } +type EndpointEmqx struct { + Host string `bson:"host" json:"host"` + ApiKey string `bson:"api_key" json:"api_key"` + SecretKey string `bson:"secret_key" json:"secret_key"` +} + func SetEndpoints(ctx context.Context, db *mongo.Database) (err error) { var ns []string if ns, err = db.ListCollectionNames(ctx, bson.M{"name": "endpoints"}); err != nil { diff --git a/model/endpoint.json b/model/endpoint.json index 205e9962..0f9ee311 100644 --- a/model/endpoint.json +++ b/model/endpoint.json @@ -20,7 +20,15 @@ "bsonType": "string" }, "schedule": { - "bsonType": "object" + "bsonType": "object", + "required": [ + "node" + ], + "properties": { + "node": { + "bsonType": "string" + } + } }, "create_time": { "bsonType": "date"