Skip to content

Commit

Permalink
update: [Imessage] Improve emqx http hook, and publish payload
Browse files Browse the repository at this point in the history
  • Loading branch information
kainonly committed Oct 20, 2023
1 parent ad98084 commit a5320e9
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 42 deletions.
4 changes: 2 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ func (x *API) Routes(h *server.Hertz) (err error) {
_imessages := h.Group("imessages", m...)
{
_imessages.GET("nodes", x.Imessages.GetNodes)
_imessages.PUT(":id/rule", x.Imessages.CreateRule)
_imessages.PUT(":id/rule", x.Imessages.UpdateRule)
_imessages.DELETE(":id/rule", x.Imessages.DeleteRule)
_imessages.GET(":id/metrics", x.Imessages.GetMetrics)
_imessages.PUT(":id/metrics", x.Imessages.CreateMetrics)
_imessages.PUT(":id/metrics", x.Imessages.UpdateMetrics)
_imessages.DELETE(":id/metrics", x.Imessages.DeleteMetrics)
_imessages.POST("publish", x.Imessages.Publish)
}
Expand Down
16 changes: 8 additions & 8 deletions api/imessages/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ func (x *Controller) GetNodes(ctx context.Context, c *app.RequestContext) {
c.JSON(200, r)
}

type CreateRuleDto struct {
type UpdateRuleDto struct {
Id string `path:"id" vd:"mongodb"`
}

func (x *Controller) CreateRule(ctx context.Context, c *app.RequestContext) {
var dto CreateRuleDto
func (x *Controller) UpdateRule(ctx context.Context, c *app.RequestContext) {
var dto UpdateRuleDto
if err := c.BindAndValidate(&dto); err != nil {
c.Error(err)
return
}

id, _ := primitive.ObjectIDFromHex(dto.Id)
r, err := x.ImessagesServices.CreateRule(ctx, id)
r, err := x.ImessagesServices.UpdateRule(ctx, id)
if err != nil {
c.Error(err)
return
Expand Down Expand Up @@ -82,19 +82,19 @@ func (x *Controller) GetMetrics(ctx context.Context, c *app.RequestContext) {
c.JSON(200, r)
}

type CreateMetricsDto struct {
type UpdateMetricsDto struct {
Id string `path:"id" vd:"mongodb"`
}

func (x *Controller) CreateMetrics(ctx context.Context, c *app.RequestContext) {
var dto CreateMetricsDto
func (x *Controller) UpdateMetrics(ctx context.Context, c *app.RequestContext) {
var dto UpdateMetricsDto
if err := c.BindAndValidate(&dto); err != nil {
c.Error(err)
return
}

id, _ := primitive.ObjectIDFromHex(dto.Id)
r, err := x.ImessagesServices.CreateMetrics(ctx, id)
r, err := x.ImessagesServices.UpdateMetrics(ctx, id)
if err != nil {
c.Error(err)
return
Expand Down
56 changes: 24 additions & 32 deletions api/imessages/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,33 @@ func (x *Service) GetNodes(ctx context.Context) (r interface{}, err error) {
return
}

func (x *Service) CreateRule(ctx context.Context, id primitive.ObjectID) (r M, err error) {
func (x *Service) UpdateRule(ctx context.Context, id primitive.ObjectID) (r M, err error) {
var data model.Imessage
if err = x.Db.Collection("imessages").
FindOne(ctx, bson.M{"_id": id}).
Decode(&data); err != nil {
return
}

if data.Rule != "" {
if _, err = x.R(ctx).
SetBody(M{
"name": data.Topic,
"sql": fmt.Sprintf(`SELECT * FROM "%s/#"`, data.Topic),
"actions": []interface{}{
"webhook:logset",
},
"enable": true,
"description": data.Description,
}).
SetSuccessResult(&r).
SetErrorResult(&r).
Put(fmt.Sprintf(`rules/%s`, data.Rule)); err != nil {
return
}
return
}

var e string
if _, err = x.R(ctx).
SetBody(M{
Expand Down Expand Up @@ -75,33 +94,6 @@ func (x *Service) CreateRule(ctx context.Context, id primitive.ObjectID) (r M, e
return
}

func (x *Service) UpdateRule(ctx context.Context, id primitive.ObjectID) (r M, err error) {
var data model.Imessage
if err = x.Db.Collection("imessages").
FindOne(ctx, bson.M{"_id": id}).
Decode(&data); err != nil {
return
}

if _, err = x.R(ctx).
SetBody(M{
"name": data.Topic,
"sql": fmt.Sprintf(`SELECT * FROM "%s/#"`, data.Topic),
"actions": []interface{}{
"webhook:logset",
},
"enable": true,
"description": data.Description,
}).
SetSuccessResult(&r).
SetErrorResult(&r).
Put(fmt.Sprintf(`rules/%s`, data.Rule)); err != nil {
return
}

return
}

func (x *Service) DeleteRule(ctx context.Context, id primitive.ObjectID) (err error) {
var data model.Imessage
if err = x.Db.Collection("imessages").
Expand Down Expand Up @@ -140,7 +132,7 @@ func (x *Service) GetMetrics(ctx context.Context, id primitive.ObjectID) (rs []i
return
}

func (x *Service) CreateMetrics(ctx context.Context, id primitive.ObjectID) (rs []interface{}, err error) {
func (x *Service) UpdateMetrics(ctx context.Context, id primitive.ObjectID) (rs []interface{}, err error) {
var data model.Imessage
if err = x.Db.Collection("imessages").
FindOne(ctx, bson.M{"_id": id}).
Expand Down Expand Up @@ -214,10 +206,10 @@ func (x *Service) Event() (err error) {
switch dto.Action {
case "create":
id, _ := primitive.ObjectIDFromHex(dto.Result.(M)["InsertedID"].(string))
if _, err = x.CreateRule(ctx, id); err != nil {
if _, err = x.UpdateRule(ctx, id); err != nil {
hlog.Error(err)
}
if _, err = x.CreateMetrics(ctx, id); err != nil {
if _, err = x.UpdateMetrics(ctx, id); err != nil {
hlog.Error(err)
}
break
Expand All @@ -229,7 +221,7 @@ func (x *Service) Event() (err error) {
if _, err = x.DeleteMetrics(ctx, id); err != nil {
hlog.Error(err)
}
if _, err = x.CreateMetrics(ctx, id); err != nil {
if _, err = x.UpdateMetrics(ctx, id); err != nil {
hlog.Error(err)
}
break
Expand Down

0 comments on commit a5320e9

Please sign in to comment.