Skip to content

Commit

Permalink
update: [schedule] node_id change to node
Browse files Browse the repository at this point in the history
  • Loading branch information
kainonly committed Aug 24, 2023
1 parent a0c7d52 commit 5658975
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 26 deletions.
12 changes: 6 additions & 6 deletions schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ import (

type Schedule struct {
Namespace string
Id string
Node string
Nats *nats.Conn
KeyValue nats.KeyValue
}

func (x *Workflow) NewSchedule(id string) (schedule *Schedule, err error) {
func (x *Workflow) NewSchedule(node string) (schedule *Schedule, err error) {
schedule = &Schedule{
Namespace: x.Namespace,
Id: id,
Node: node,
Nats: x.Nats,
KeyValue: nil,
}
bucket := fmt.Sprintf(`%s_schedules_%s`, x.Namespace, id)
bucket := fmt.Sprintf(`%s_schedules_%s`, x.Namespace, node)
if schedule.KeyValue, err = x.JetStream.KeyValue(bucket); err != nil {
return
}
Expand All @@ -33,7 +33,7 @@ func (x *Workflow) NewSchedule(id string) (schedule *Schedule, err error) {
func (x *Schedule) Ping() (result bool, err error) {
subj := fmt.Sprintf(`%s.schedules`, x.Namespace)
var msg *nats.Msg
if msg, err = x.Nats.Request(subj, []byte(x.Id), time.Second*5); err != nil {
if msg, err = x.Nats.Request(subj, []byte(x.Node), time.Second*5); err != nil {
return
}
result = string(msg.Data) == "ok"
Expand All @@ -59,7 +59,7 @@ func (x *Schedule) Get(key string) (option typ.ScheduleOption, err error) {
return
}
var msg *nats.Msg
subj := fmt.Sprintf(`%s.schedules.%s`, x.Namespace, x.Id)
subj := fmt.Sprintf(`%s.schedules.%s`, x.Namespace, x.Node)
if msg, err = x.Nats.Request(subj, []byte(key), time.Second*3); err != nil {
return
}
Expand Down
13 changes: 5 additions & 8 deletions schedule/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package app
import (
"errors"
"fmt"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"github.com/robfig/cron/v3"
"github.com/vmihailenco/msgpack/v5"
Expand All @@ -16,14 +15,12 @@ import (

type App struct {
*common.Inject
Node string
M sync.Map
M sync.Map
}

func Initialize(i *common.Inject) *App {
return &App{
Inject: i,
Node: uuid.New().String(),
M: sync.Map{},
}
}
Expand Down Expand Up @@ -119,7 +116,7 @@ func (x *App) Set(key string, option typ.ScheduleOption) (err error) {

func (x *App) SetJob(key string, c *cron.Cron, index int, job typ.ScheduleJob) (err error) {
if _, err = c.AddFunc(job.Spec, func() {
subj := fmt.Sprintf(`%s.jobs.%s`, x.V.Namespace, x.V.Id)
subj := fmt.Sprintf(`%s.jobs.%s`, x.V.Namespace, x.V.Node)
var b []byte
if b, err = msgpack.Marshal(typ.Job{
Key: key,
Expand Down Expand Up @@ -163,8 +160,8 @@ func (x *App) GetState(key string) []cron.Entry {
}

func (x *App) State() (err error) {
subj := fmt.Sprintf(`%s.schedules.%s`, x.V.Namespace, x.V.Id)
queue := fmt.Sprintf(`%s:schedules:%s`, x.V.Namespace, x.V.Id)
subj := fmt.Sprintf(`%s.schedules.%s`, x.V.Namespace, x.V.Node)
queue := fmt.Sprintf(`%s:schedules:%s`, x.V.Namespace, x.V.Node)
if _, err = x.Nats.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
key := string(msg.Data)
var states []typ.ScheduleState
Expand All @@ -191,7 +188,7 @@ func (x *App) State() (err error) {
func (x *App) Ping() (err error) {
subj := fmt.Sprintf(`%s.schedules`, x.V.Namespace)
if _, err = x.Nats.Subscribe(subj, func(msg *nats.Msg) {
if string(msg.Data) == x.V.Id {
if string(msg.Data) == x.V.Node {
msg.Respond([]byte("ok"))
}
}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion schedule/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Inject struct {

type Values struct {
Namespace string `env:"NAMESPACE,required"`
Id string `env:"ID,required"`
Node string `env:"NODE,required"`
Nats struct {
Hosts []string `env:"HOSTS,required" envSeparator:","`
Nkey string `env:"NKEY,required"`
Expand Down
22 changes: 11 additions & 11 deletions typ/typ.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,26 @@ type Job struct {
}

type HttpOption struct {
Url string `msgpack:"url"`
Headers map[string]string `msgpack:"headers"`
Body map[string]interface{} `msgpack:"body"`
Url string `json:"url" msgpack:"url"`
Headers map[string]string `json:"headers" msgpack:"headers"`
Body map[string]interface{} `json:"body" msgpack:"body"`
}

type ScheduleOption struct {
Status bool `msgpack:"status"`
Jobs []ScheduleJob `msgpack:"jobs"`
Status bool `json:"status" msgpack:"status" `
Jobs []ScheduleJob `json:"jobs" msgpack:"jobs" `
}

type ScheduleJob struct {
Mode string `msgpack:"mode"`
Spec string `msgpack:"spec"`
Option interface{} `msgpack:"option"`
ScheduleState `msgpack:"state"`
Mode string `json:"mode" msgpack:"mode"`
Spec string `json:"spec" msgpack:"spec"`
Option interface{} `json:"option" msgpack:"option"`
ScheduleState `json:"schedule_state" msgpack:"state"`
}

type ScheduleState struct {
Next time.Time `msgpack:"next"`
Prev time.Time `msgpack:"prev"`
Next time.Time `json:"next" msgpack:"next"`
Prev time.Time `json:"prev" msgpack:"prev"`
}

type ScheduleStatus struct {
Expand Down

0 comments on commit 5658975

Please sign in to comment.