From b5a99eba7f8e98f4ef30f43f517e8bf62fbd47d0 Mon Sep 17 00:00:00 2001 From: Denis Gukov Date: Sun, 27 Aug 2023 18:02:51 +0200 Subject: [PATCH] Runners (#1439) * refactor(runners): extract jobs to separate entity --- api-docs-runners.yml | 54 ++++++ api/runners/handler.go | 52 ++++++ cli/cmd/runner.go | 25 +++ db/Runner.go | 15 ++ db/Store.go | 16 ++ db/bolt/runner.go | 35 ++++ db/sql/runner.go | 35 ++++ services/runners/JobPool.go | 158 ++++++++++++++++++ .../schedules/{pool.go => SchedulePool.go} | 0 .../{pool_test.go => SchedulePool_test.go} | 0 services/tasks/AnsibleJob.go | 23 +++ services/tasks/RunnerPool.go | 12 ++ services/tasks/{pool.go => TaskPool.go} | 15 ++ services/tasks/{runner.go => TaskRunner.go} | 15 +- .../{runner_test.go => TaskRunner_test.go} | 0 util/config.go | 2 + 16 files changed, 447 insertions(+), 10 deletions(-) create mode 100644 api-docs-runners.yml create mode 100644 api/runners/handler.go create mode 100644 cli/cmd/runner.go create mode 100644 db/Runner.go create mode 100644 db/bolt/runner.go create mode 100644 db/sql/runner.go create mode 100644 services/runners/JobPool.go rename services/schedules/{pool.go => SchedulePool.go} (100%) rename services/schedules/{pool_test.go => SchedulePool_test.go} (100%) create mode 100644 services/tasks/AnsibleJob.go create mode 100644 services/tasks/RunnerPool.go rename services/tasks/{pool.go => TaskPool.go} (96%) rename services/tasks/{runner.go => TaskRunner.go} (98%) rename services/tasks/{runner_test.go => TaskRunner_test.go} (100%) diff --git a/api-docs-runners.yml b/api-docs-runners.yml new file mode 100644 index 000000000..1f834c43b --- /dev/null +++ b/api-docs-runners.yml @@ -0,0 +1,54 @@ +swagger: '3.0' +info: + title: SEMAPHORE + description: Semaphore Runner API + version: "2.2.0" + +host: localhost:3000 + +consumes: + - application/json +produces: + - application/json + - text/plain; charset=utf-8 + +tags: + - name: authentication + description: Authentication, Logout & API Tokens + - name: project + description: Everything related to a project + - name: user + description: User-related API + +schemes: + - http + - https + +basePath: /api/runners + +definitions: + +paths: + /register: + post: + requestBody: + content: + application/json: + schema: + type: object + required: + - registrationToken + properties: + registrationToken: { type: string } + responses: + 200: + description: API Token + + /unregister: + post: + + /status: + put: + + /jobs: + get: \ No newline at end of file diff --git a/api/runners/handler.go b/api/runners/handler.go new file mode 100644 index 000000000..eb94f1bd9 --- /dev/null +++ b/api/runners/handler.go @@ -0,0 +1,52 @@ +package runners + +import ( + "github.com/ansible-semaphore/semaphore/api/helpers" + "github.com/ansible-semaphore/semaphore/db" + "github.com/ansible-semaphore/semaphore/util" + "github.com/gorilla/mux" + "net/http" + "strings" +) + +func RunnerRoute() *mux.Router { + r := mux.NewRouter() + + webPath := "/" + if util.WebHostURL != nil { + webPath = util.WebHostURL.Path + if !strings.HasSuffix(webPath, "/") { + webPath += "/" + } + } + + pingRouter := r.Path(webPath + "api/runners/register").Subrouter() + + pingRouter.Methods("POST", "HEAD").HandlerFunc(registerRunner) + + return r +} + +func registerRunner(w http.ResponseWriter, r *http.Request) { + var register struct { + RegistrationToken string `json:"registration_token" binding:"required"` + } + + if !helpers.Bind(w, r, ®ister) { + return + } + + if register.RegistrationToken != util.Config.RegistrationToken { + return + } + + runner, err := helpers.Store(r).CreateRunner(db.Runner{ + State: db.RunnerActive, + }) + + if err != nil { + return + } + + helpers.WriteJSON(w, http.StatusOK, runner) +} diff --git a/cli/cmd/runner.go b/cli/cmd/runner.go new file mode 100644 index 000000000..229c6cb7e --- /dev/null +++ b/cli/cmd/runner.go @@ -0,0 +1,25 @@ +package cmd + +import ( + "github.com/ansible-semaphore/semaphore/services/runners" + "github.com/spf13/cobra" +) + +func init() { + rootCmd.AddCommand(runnerCmd) +} + +func runRunner() { + + taskPool := runners.JobPool{} + + go taskPool.Run() +} + +var runnerCmd = &cobra.Command{ + Use: "runner", + Short: "Run in runner mode", + Run: func(cmd *cobra.Command, args []string) { + runRunner() + }, +} diff --git a/db/Runner.go b/db/Runner.go new file mode 100644 index 000000000..4057c49bb --- /dev/null +++ b/db/Runner.go @@ -0,0 +1,15 @@ +package db + +type RunnerState string + +const ( + RunnerOffline RunnerState = "offline" + RunnerActive RunnerState = "active" +) + +type Runner struct { + ID int `db:"id" json:"-"` + Token string `db:"token" json:"token"` + ProjectID *int `db:"project_id" json:"project_id"` + State RunnerState `db:"state" json:"state"` +} diff --git a/db/Store.go b/db/Store.go index a43728ab2..f383ffcf0 100644 --- a/db/Store.go +++ b/db/Store.go @@ -199,6 +199,15 @@ type Store interface { CreateView(view View) (View, error) DeleteView(projectID int, viewID int) error SetViewPositions(projectID int, viewPositions map[int]int) error + + GetRunner(projectID int, runnerID int) (Runner, error) + GetRunners(projectID int) ([]Runner, error) + DeleteRunner(projectID int, runnerID int) error + GetGlobalRunner(runnerID int) (Runner, error) + GetGlobalRunners() ([]Runner, error) + DeleteGlobalRunner(runnerID int) error + UpdateRunner(runner Runner) error + CreateRunner(runner Runner) (Runner, error) } var AccessKeyProps = ObjectProps{ @@ -304,6 +313,13 @@ var ViewProps = ObjectProps{ DefaultSortingColumn: "position", } +var RunnerProps = ObjectProps{ + TableName: "runner", + Type: reflect.TypeOf(Runner{}), + PrimaryColumnName: "id", + IsGlobal: true, +} + func (p ObjectProps) GetReferringFieldsFrom(t reflect.Type) (fields []string, err error) { n := t.NumField() for i := 0; i < n; i++ { diff --git a/db/bolt/runner.go b/db/bolt/runner.go new file mode 100644 index 000000000..bb91ffbbe --- /dev/null +++ b/db/bolt/runner.go @@ -0,0 +1,35 @@ +package bolt + +import "github.com/ansible-semaphore/semaphore/db" + +func (d *BoltDb) GetRunner(projectID int, runnerID int) (runner db.Runner, err error) { + return +} + +func (d *BoltDb) GetRunners(projectID int) (runners []db.Runner, err error) { + return +} + +func (d *BoltDb) DeleteRunner(projectID int, runnerID int) (err error) { + return +} + +func (d *BoltDb) GetGlobalRunner(runnerID int) (runner db.Runner, err error) { + return +} + +func (d *BoltDb) GetGlobalRunners() (runners []db.Runner, err error) { + return +} + +func (d *BoltDb) DeleteGlobalRunner(runnerID int) (err error) { + return +} + +func (d *BoltDb) UpdateRunner(runner db.Runner) (err error) { + return +} + +func (d *BoltDb) CreateRunner(runner db.Runner) (newRunner db.Runner, err error) { + return +} diff --git a/db/sql/runner.go b/db/sql/runner.go new file mode 100644 index 000000000..49dcbe27a --- /dev/null +++ b/db/sql/runner.go @@ -0,0 +1,35 @@ +package sql + +import "github.com/ansible-semaphore/semaphore/db" + +func (d *SqlDb) GetRunner(projectID int, runnerID int) (runner db.Runner, err error) { + return +} + +func (d *SqlDb) GetRunners(projectID int) (runners []db.Runner, err error) { + return +} + +func (d *SqlDb) DeleteRunner(projectID int, runnerID int) (err error) { + return +} + +func (d *SqlDb) GetGlobalRunner(runnerID int) (runner db.Runner, err error) { + return +} + +func (d *SqlDb) GetGlobalRunners() (runners []db.Runner, err error) { + return +} + +func (d *SqlDb) DeleteGlobalRunner(runnerID int) (err error) { + return +} + +func (d *SqlDb) UpdateRunner(runner db.Runner) (err error) { + return +} + +func (d *SqlDb) CreateRunner(runner db.Runner) (newRunner db.Runner, err error) { + return +} diff --git a/services/runners/JobPool.go b/services/runners/JobPool.go new file mode 100644 index 000000000..72400c91a --- /dev/null +++ b/services/runners/JobPool.go @@ -0,0 +1,158 @@ +// +// Runner's job pool. NOT SERVER!!! +// Runner gets jobs from the server and put them to this pool. +// + +package runners + +import ( + "encoding/json" + "fmt" + log "github.com/Sirupsen/logrus" + "github.com/ansible-semaphore/semaphore/db" + "github.com/ansible-semaphore/semaphore/services/tasks" + "io/ioutil" + "net/http" + "strconv" + "time" +) + +type logRecord struct { + job *job + output string + time time.Time +} + +type resourceLock struct { + lock bool + holder *job +} + +// job presents current job on semaphore server. +type job struct { + + // job presents remote or local job information + job *tasks.LocalAnsibleJob + Status db.TaskStatus + kind jobType + args []string + environmentVars []string + id int +} + +type jobType int + +type Response struct { + Message string `json:"message"` + Status int `json:"status"` +} + +const ( + playbook jobType = iota + galaxy +) + +func (j *job) run() { + var err error + switch j.kind { + case playbook: + err = j.job.RunPlaybook(j.args, &j.environmentVars, nil) + case galaxy: + err = j.job.RunGalaxy(j.args) + default: + panic("Unknown job type") + } + + if err != nil { + // TODO: some logging + } +} + +type JobPool struct { + // logger channel used to putting log records to database. + logger chan logRecord + + // register channel used to put tasks to queue. + register chan *job + + resourceLocker chan *resourceLock + + logRecords []logRecord + + queue []*job +} + +func (p *JobPool) Run() { + ticker := time.NewTicker(5 * time.Second) + + defer func() { + ticker.Stop() + }() + + for { + select { + case record := <-p.logger: // new log message which should be put to database + p.logRecords = append(p.logRecords, record) + + case job := <-p.register: // new task created by API or schedule + p.queue = append(p.queue, job) + + case <-ticker.C: // timer 5 seconds: get task from queue and run it + if len(p.queue) == 0 { + break + } + + t := p.queue[0] + if t.Status == db.TaskFailStatus { + //delete failed TaskRunner from queue + p.queue = p.queue[1:] + log.Info("Task " + strconv.Itoa(t.id) + " removed from queue") + break + } + + log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.id)) + p.resourceLocker <- &resourceLock{lock: true, holder: t} + + go t.run() + p.queue = p.queue[1:] + log.Info("Task " + strconv.Itoa(t.id) + " removed from queue") + } + } +} + +// checkNewJobs tries to find runner to queued jobs +func (p *JobPool) checkNewJobs() { + client := &http.Client{} + url := "https://example.com" + req, err := http.NewRequest("GET", url, nil) + if err != nil { + fmt.Println("Error creating request:", err) + return + } + + resp, err := client.Do(req) + if err != nil { + fmt.Println("Error making request:", err) + return + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body:", err) + return + } + + var response Response + err = json.Unmarshal(body, &response) + if err != nil { + fmt.Println("Error parsing JSON:", err) + return + } + + taskRunner := job{ + job: &tasks.LocalAnsibleJob{}, + } + + p.register <- &taskRunner +} diff --git a/services/schedules/pool.go b/services/schedules/SchedulePool.go similarity index 100% rename from services/schedules/pool.go rename to services/schedules/SchedulePool.go diff --git a/services/schedules/pool_test.go b/services/schedules/SchedulePool_test.go similarity index 100% rename from services/schedules/pool_test.go rename to services/schedules/SchedulePool_test.go diff --git a/services/tasks/AnsibleJob.go b/services/tasks/AnsibleJob.go new file mode 100644 index 000000000..6683743de --- /dev/null +++ b/services/tasks/AnsibleJob.go @@ -0,0 +1,23 @@ +package tasks + +import ( + "github.com/ansible-semaphore/semaphore/lib" + "os" +) + +type AnsibleJob interface { + RunGalaxy(args []string) error + RunPlaybook(args []string, environmentVars *[]string, cb func(*os.Process)) error +} + +type LocalAnsibleJob struct { + playbook *lib.AnsiblePlaybook +} + +func (j *LocalAnsibleJob) RunGalaxy(args []string) error { + return j.playbook.RunGalaxy(args) +} + +func (j *LocalAnsibleJob) RunPlaybook(args []string, environmentVars *[]string, cb func(*os.Process)) error { + return j.playbook.RunPlaybook(args, environmentVars, cb) +} diff --git a/services/tasks/RunnerPool.go b/services/tasks/RunnerPool.go new file mode 100644 index 000000000..45cb10acd --- /dev/null +++ b/services/tasks/RunnerPool.go @@ -0,0 +1,12 @@ +package tasks + +import "github.com/ansible-semaphore/semaphore/lib" + +// RunnerPool is a collection of the registered runners. +type RunnerPool struct { +} + +func (p *RunnerPool) CreateJob(playbook *lib.AnsiblePlaybook) (AnsibleJob, error) { + + return &LocalAnsibleJob{playbook: playbook}, nil +} diff --git a/services/tasks/pool.go b/services/tasks/TaskPool.go similarity index 96% rename from services/tasks/pool.go rename to services/tasks/TaskPool.go index 9672507a1..c6910fe38 100644 --- a/services/tasks/pool.go +++ b/services/tasks/TaskPool.go @@ -2,6 +2,7 @@ package tasks import ( "github.com/ansible-semaphore/semaphore/db" + "github.com/ansible-semaphore/semaphore/lib" "regexp" "strconv" "strings" @@ -40,6 +41,8 @@ type TaskPool struct { store db.Store resourceLocker chan *resourceLock + + runners RunnerPool } func (p *TaskPool) GetTask(id int) (task *TaskRunner) { @@ -331,6 +334,18 @@ func (p *TaskPool) AddTask(taskObj db.Task, userID *int, projectID int) (newTask return } + job, err := p.runners.CreateJob(&lib.AnsiblePlaybook{ + Logger: &taskRunner, + TemplateID: taskRunner.template.ID, + Repository: taskRunner.repository, + }) + + if err != nil { + return + } + + taskRunner.job = job + p.register <- &taskRunner objType := db.EventTask diff --git a/services/tasks/runner.go b/services/tasks/TaskRunner.go similarity index 98% rename from services/tasks/runner.go rename to services/tasks/TaskRunner.go index ab8dd9758..132be8c5d 100644 --- a/services/tasks/runner.go +++ b/services/tasks/TaskRunner.go @@ -33,6 +33,9 @@ type TaskRunner struct { prepared bool process *os.Process pool *TaskPool + + // job executes Ansible and returns stdout to Semaphore logs + job AnsibleJob } func getMD5Hash(filepath string) (string, error) { @@ -573,11 +576,7 @@ func (t *TaskRunner) installRequirements() error { } func (t *TaskRunner) runGalaxy(args []string) error { - return lib.AnsiblePlaybook{ - Logger: t, - TemplateID: t.template.ID, - Repository: t.repository, - }.RunGalaxy(args) + return t.job.RunGalaxy(args) } func (t *TaskRunner) runPlaybook() (err error) { @@ -591,11 +590,7 @@ func (t *TaskRunner) runPlaybook() (err error) { return } - return lib.AnsiblePlaybook{ - Logger: t, - TemplateID: t.template.ID, - Repository: t.repository, - }.RunPlaybook(args, &environmentVariables, func(p *os.Process) { t.process = p }) + return t.job.RunPlaybook(args, &environmentVariables, func(p *os.Process) { t.process = p }) } func (t *TaskRunner) getEnvironmentENV() (arr []string, err error) { diff --git a/services/tasks/runner_test.go b/services/tasks/TaskRunner_test.go similarity index 100% rename from services/tasks/runner_test.go rename to services/tasks/TaskRunner_test.go diff --git a/util/config.go b/util/config.go index 2634445d1..9660b6e52 100644 --- a/util/config.go +++ b/util/config.go @@ -150,6 +150,8 @@ type ConfigType struct { // task concurrency MaxParallelTasks int `json:"max_parallel_tasks"` + RegistrationToken string `json:"registration_token"` + // feature switches PasswordLoginDisable bool `json:"password_login_disable"` NonAdminCanCreateProject bool `json:"non_admin_can_create_project"`