diff --git a/app/jobs/job.go b/app/jobs/job.go index 9ccd215..5d84807 100644 --- a/app/jobs/job.go +++ b/app/jobs/job.go @@ -10,7 +10,6 @@ import ( "os/exec" "runtime/debug" "strings" - "sync" "time" ) @@ -41,14 +40,13 @@ func init() { } type Job struct { - id int - logId int64 - name string - task *models.Task - runFunc func(time.Duration) (string, string, error, bool) - running sync.Mutex - status int - Concurrent bool + id int // 任务ID + logId int64 // 日志记录ID + name string // 任务名称 + task *models.Task // 任务对象 + runFunc func(time.Duration) (string, string, error, bool) // 执行函数 + status int // 任务状态,1表示正在执行中 + Concurrent bool // 同一个任务是否允许并行执行 } func NewJobFromTask(task *models.Task) (*Job, error) { @@ -57,7 +55,7 @@ func NewJobFromTask(task *models.Task) (*Job, error) { } job := NewCommandJob(task.Id, task.TaskName, task.Command) job.task = task - job.Concurrent = task.Concurrent == 0 + job.Concurrent = task.Concurrent == 1 return job, nil } @@ -97,19 +95,17 @@ func (j *Job) GetLogId() int64 { } func (j *Job) Run() { + if !j.Concurrent && j.status > 0 { + beego.Debug(fmt.Sprintf("任务[%d]上一次执行尚未结束,本次被忽略。", j.id)) + return + } + defer func() { if err := recover(); err != nil { beego.Error(err, "\n", string(debug.Stack())) } }() - t := time.Now() - - if j.Concurrent { - j.running.Lock() - defer j.running.Unlock() - } - if workPool != nil { workPool <- true defer func() { @@ -117,11 +113,14 @@ func (j *Job) Run() { }() } - j.status = 1 + beego.Debug(fmt.Sprintf("开始执行任务: %d", j.id)) + + j.status++ defer func() { - j.status = 0 + j.status-- }() + t := time.Now() timeout := time.Duration(time.Hour * 24) if j.task.Timeout > 0 { timeout = time.Second * time.Duration(j.task.Timeout) @@ -151,7 +150,7 @@ func (j *Job) Run() { // 更新上次执行时间 j.task.PrevTime = t.Unix() j.task.ExecuteTimes++ - j.task.Update() + j.task.Update("PrevTime", "ExecuteTimes") // 发送邮件通知 if (j.task.Notify == 1 && err != nil) || j.task.Notify == 2 {