From ab945f10d9a47ec2fb06af164d00e3e2f78f5845 Mon Sep 17 00:00:00 2001 From: Kevin Burke Date: Mon, 22 Feb 2021 22:29:53 -0800 Subject: [PATCH] server: implement GET /v2/job-types Also start working on passing around a *newmodels.Queries instead of hardcoding `*newmodels.DB` everywhere. Add a new `responses` package that can help us get rid of `auto_id` in the database response, which is something that we don't want to expose. Updates #8. --- db/queries/jobs.sql | 7 +-- dbtohttp/dbtohttp.go | 24 +++++++++ models/queued_jobs/queued_jobs.go | 2 + newmodels/setup.go | 12 +++-- responses/responses.go | 11 ++++ server/enqueue_job_test.go | 12 ++++- server/regex_handler.go | 4 ++ server/server.go | 83 ++++++++++++++++++------------- server/v2.go | 31 ++++++++++++ services/enqueue.go | 28 +++++++++++ test/server/server_test.go | 7 +-- 11 files changed, 175 insertions(+), 46 deletions(-) create mode 100644 dbtohttp/dbtohttp.go create mode 100644 responses/responses.go create mode 100644 server/v2.go create mode 100644 services/enqueue.go diff --git a/db/queries/jobs.sql b/db/queries/jobs.sql index 5674efe..26e8961 100644 --- a/db/queries/jobs.sql +++ b/db/queries/jobs.sql @@ -8,12 +8,13 @@ VALUES ($1, $2, $3, $4) RETURNING *; -- name: GetJob :one -SELECT * -FROM jobs +SELECT * FROM jobs WHERE name = $1; -- name: GetAllJobs :many -SELECT * FROM jobs WHERE name != 'meta.shutdown'; +SELECT * FROM jobs +WHERE name != 'meta.shutdown' +ORDER BY auto_id DESC; -- name: DeleteAllJobs :execrows DELETE FROM jobs; diff --git a/dbtohttp/dbtohttp.go b/dbtohttp/dbtohttp.go new file mode 100644 index 0000000..abd8d8b --- /dev/null +++ b/dbtohttp/dbtohttp.go @@ -0,0 +1,24 @@ +package dbtohttp + +import ( + "github.com/kevinburke/rickover/newmodels" + "github.com/kevinburke/rickover/responses" +) + +func Job(j newmodels.Job) responses.Job { + return responses.Job{ + Name: j.Name, + DeliveryStrategy: string(j.DeliveryStrategy), + Attempts: j.Attempts, + Concurrency: j.Concurrency, + CreatedAt: j.CreatedAt, + } +} + +func Jobs(js []newmodels.Job) []responses.Job { + resp := make([]responses.Job, len(js)) + for i := range js { + resp[i] = Job(js[i]) + } + return resp +} diff --git a/models/queued_jobs/queued_jobs.go b/models/queued_jobs/queued_jobs.go index 54b8741..9154d68 100644 --- a/models/queued_jobs/queued_jobs.go +++ b/models/queued_jobs/queued_jobs.go @@ -39,6 +39,8 @@ var StuckJobLimit = 100 // Enqueue creates a new queued job with the given ID and fields. A // sql.ErrNoRows will be returned if the `name` does not exist in the jobs // table. Otherwise the QueuedJob will be returned. +// +// Deprecated: use services.Enqueue instead. func Enqueue(params newmodels.EnqueueJobParams) (*newmodels.QueuedJob, error) { qj, err := newmodels.DB.EnqueueJob(context.TODO(), params) if err != nil { diff --git a/newmodels/setup.go b/newmodels/setup.go index e006be8..11d6662 100644 --- a/newmodels/setup.go +++ b/newmodels/setup.go @@ -7,14 +7,18 @@ import ( "github.com/kevinburke/rickover/models/db" ) -var DB *Queries +// DefaultServer relies on this being initialized (and not overwritten) +var DB = new(Queries) func Setup(ctx context.Context) error { if !db.Connected() { return errors.New("newmodels: no database connection, bailing") } - var err error - DB, err = Prepare(ctx, db.Conn) - return err + qs, err := Prepare(ctx, db.Conn) + if err != nil { + return err + } + *DB = *qs + return nil } diff --git a/responses/responses.go b/responses/responses.go new file mode 100644 index 0000000..c2d6afc --- /dev/null +++ b/responses/responses.go @@ -0,0 +1,11 @@ +package responses + +import "time" + +type Job struct { + Name string `json:"name"` + DeliveryStrategy string `json:"delivery_strategy"` + Attempts int16 `json:"attempts"` + Concurrency int16 `json:"concurrency"` + CreatedAt time.Time `json:"created_at"` +} diff --git a/server/enqueue_job_test.go b/server/enqueue_job_test.go index 60b3fde..efee90a 100644 --- a/server/enqueue_job_test.go +++ b/server/enqueue_job_test.go @@ -62,6 +62,8 @@ func Test401UnknownPassword(t *testing.T) { func Test400NoBody(t *testing.T) { t.Parallel() + test.SetUp(t) + defer test.TearDown(t) w := httptest.NewRecorder() ejr := &EnqueueJobRequest{ Data: empty, @@ -84,6 +86,8 @@ func Test400NoBody(t *testing.T) { func Test400EmptyBody(t *testing.T) { t.Parallel() + test.SetUp(t) + defer test.TearDown(t) w := httptest.NewRecorder() var v interface{} err := json.Unmarshal([]byte("{}"), &v) @@ -92,7 +96,7 @@ func Test400EmptyBody(t *testing.T) { json.NewEncoder(b).Encode(v) ssa, server := newSSAServer() ssa.AddUser("test", "password") - req, _ := http.NewRequest("PUT", "/v1/jobs/echo/job_f17373a6-2cd7-4010-afba-eebc6dc6f9ab", b) + req := httptest.NewRequest("PUT", "/v1/jobs/echo/job_f17373a6-2cd7-4010-afba-eebc6dc6f9ab", b) req.SetBasicAuth("test", "password") server.ServeHTTP(w, req) test.AssertEquals(t, w.Code, http.StatusBadRequest) @@ -106,6 +110,8 @@ func Test400EmptyBody(t *testing.T) { func Test400InvalidUUID(t *testing.T) { t.Parallel() + test.SetUp(t) + defer test.TearDown(t) w := httptest.NewRecorder() ejr := &EnqueueJobRequest{ Data: empty, @@ -125,6 +131,8 @@ func Test400InvalidUUID(t *testing.T) { func Test400WrongPrefix(t *testing.T) { t.Parallel() + test.SetUp(t) + defer test.TearDown(t) w := httptest.NewRecorder() ejr := &EnqueueJobRequest{ Data: empty, @@ -139,6 +147,8 @@ func Test400WrongPrefix(t *testing.T) { func Test413TooLargeJSON(t *testing.T) { t.Parallel() + test.SetUp(t) + defer test.TearDown(t) w := httptest.NewRecorder() // 4 bytes per record - the value and the quotes around it. var bigarr [100 * 256]string diff --git a/server/regex_handler.go b/server/regex_handler.go index d7f942d..84db130 100644 --- a/server/regex_handler.go +++ b/server/regex_handler.go @@ -46,6 +46,10 @@ func (h *RegexpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { for _, route := range h.routes { if route.pattern.MatchString(r.URL.Path) { upperMethod := strings.ToUpper(r.Method) + if route.methods == nil && upperMethod != "OPTIONS" { + route.handler.ServeHTTP(w, r) + return + } for _, method := range route.methods { if strings.ToUpper(method) == upperMethod { route.handler.ServeHTTP(w, r) diff --git a/server/server.go b/server/server.go index 36dc694..0b8ba70 100644 --- a/server/server.go +++ b/server/server.go @@ -23,12 +23,14 @@ import ( "github.com/kevinburke/rest" "github.com/kevinburke/rest/resterror" "github.com/kevinburke/rickover/config" + "github.com/kevinburke/rickover/dbtohttp" "github.com/kevinburke/rickover/metrics" "github.com/kevinburke/rickover/models/archived_jobs" "github.com/kevinburke/rickover/models/db" "github.com/kevinburke/rickover/models/jobs" "github.com/kevinburke/rickover/models/queued_jobs" "github.com/kevinburke/rickover/newmodels" + "github.com/kevinburke/rickover/services" "github.com/kevinburke/rickover/setup" "github.com/lib/pq" ) @@ -93,12 +95,12 @@ func New(ctx context.Context, cfg Config) (http.Handler, error) { if err := setup.DB(ctx, cfg.Connector, cfg.NumConns); err != nil { return nil, err } - s := Get(cfg) - return s, nil + return Get(cfg), nil } // Get returns a http.Handler with all routes initialized using the given -// Authorizer. +// Authorizer. Get assumes that setup.DB has been called; to call both use +// New. func Get(c Config) http.Handler { a := c.Auth if a == nil { @@ -106,15 +108,21 @@ func Get(c Config) http.Handler { } h := new(RegexpHandler) + db := newmodels.DB useMetaShutdown := !c.DisableMetaShutdown - h.Handler(jobsRoute, []string{"POST"}, authHandler(createJob(useMetaShutdown), a)) - h.Handler(getJobRoute, []string{"GET"}, authHandler(handleJobRoute(useMetaShutdown), a)) - h.Handler(getJobTypeRoute, []string{"GET"}, authHandler(getJobType(), a)) + + // v1 + h.Handler(jobsRoute, []string{"POST"}, authHandler(createJob(db, useMetaShutdown), a)) + h.Handler(getJobRoute, []string{"GET"}, authHandler(handleJobRoute(db, useMetaShutdown), a)) + h.Handler(getJobTypeRoute, []string{"GET"}, authHandler(getJobType(db), a)) h.Handler(replayRoute, []string{"POST"}, authHandler(replayHandler(), a)) - h.Handler(jobIdRoute, []string{"GET", "POST", "PUT"}, authHandler(handleJobRoute(useMetaShutdown), a)) - h.Handler(archivedJobsRoute, []string{"GET"}, authHandler(listArchivedJobs(), a)) + h.Handler(jobIdRoute, []string{"GET", "POST", "PUT"}, authHandler(handleJobRoute(db, useMetaShutdown), a)) + h.Handler(archivedJobsRoute, []string{"GET"}, authHandler(listArchivedJobs(db), a)) + + // v2 + h.Handler(regexp.MustCompile(`^/v2/(.+)$`), nil, authHandler(V2(db), a)) h.Handler(regexp.MustCompile("^/debug/pprof$"), []string{"GET"}, authHandler(http.HandlerFunc(pprof.Index), a)) h.Handler(regexp.MustCompile("^/debug/pprof/cmdline$"), []string{"GET"}, authHandler(http.HandlerFunc(pprof.Cmdline), a)) @@ -145,7 +153,7 @@ func serverHeaderHandler(h http.Handler) http.Handler { } else { w.Header().Set("Content-Type", "application/json; charset=utf-8") } - w.Header().Set("Server", fmt.Sprintf("rickover/%s", config.Version)) + w.Header().Set("Server", "rickover/"+config.Version) h.ServeHTTP(w, r) }) } @@ -235,10 +243,10 @@ type CreateJobRequest struct { // GET /v1/jobs/:jobName // // Get a job type by name. Returns a models.Job or an error -func getJobType() http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +func getJobType(db *newmodels.Queries) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { jobName := getJobTypeRoute.FindStringSubmatch(r.URL.Path)[1] - job, err := jobs.Get(r.Context(), jobName) + job, err := db.GetJob(r.Context(), jobName) if err != nil { if err == sql.ErrNoRows { notFound(w, new404(r)) @@ -247,8 +255,8 @@ func getJobType() http.Handler { rest.ServerError(w, r, err) return } - respond(Logger, w, r, job) - }) + respond(Logger, w, r, dbtohttp.Job(job)) + } } var Logger = handlers.Logger @@ -257,8 +265,8 @@ var Logger = handlers.Logger // // createJob returns a http.HandlerFunc that responds to job creation requests // using the given authorizer interface. -func createJob(useMetaShutdown bool) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +func createJob(db *newmodels.Queries, useMetaShutdown bool) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { if r.Body == nil { rest.BadRequest(w, r, createEmptyErr("name", r.URL.Path)) return @@ -326,7 +334,9 @@ func createJob(useMetaShutdown bool) http.Handler { Attempts: jr.Attempts, } start := time.Now() - job, err := jobs.Create(newmodels.CreateJobParams{ + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + job, err := db.CreateJob(ctx, newmodels.CreateJobParams{ Name: jobData.Name, DeliveryStrategy: jobData.DeliveryStrategy, Concurrency: jobData.Concurrency, @@ -354,12 +364,12 @@ func createJob(useMetaShutdown bool) http.Handler { // a shutdown job, which will instruct the dequeuer to shut down. ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - if err := createMetaShutdownJob(ctx); err != nil { + if err := createMetaShutdownJob(ctx, db); err != nil { return } id := types.GenerateUUID("job_") runAfter := time.Now().Add(-1 * 5 * time.Second) - queued_jobs.Enqueue(newmodels.EnqueueJobParams{ + services.Enqueue(ctx, db, newmodels.EnqueueJobParams{ ID: id, Name: "meta.shutdown", RunAfter: runAfter, ExpiresAt: types.NullTime{ Valid: true, @@ -369,9 +379,9 @@ func createJob(useMetaShutdown bool) http.Handler { }) }() } - created(Logger, w, r, job) + created(Logger, w, r, dbtohttp.Job(job)) metrics.Increment("type.create.success") - }) + } } // An EnqueueJobRequest is sent in the body of a request to PUT @@ -388,15 +398,15 @@ type EnqueueJobRequest struct { } // GET/POST/PUT disambiguator for /v1/jobs/:name/:id -func handleJobRoute(useMetaShutdown bool) http.HandlerFunc { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +func handleJobRoute(db *newmodels.Queries, useMetaShutdown bool) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { if r.Method == "POST" { j := jobStatusUpdater{} j.ServeHTTP(w, r) return } if r.Method == "PUT" { - j := jobEnqueuer{useMetaShutdown: useMetaShutdown} + j := jobEnqueuer{db: db, useMetaShutdown: useMetaShutdown} j.ServeHTTP(w, r) return } @@ -406,7 +416,7 @@ func handleJobRoute(useMetaShutdown bool) http.HandlerFunc { return } rest.NotAllowed(w, r) - }) + } } type jobStatusGetter struct{} @@ -472,11 +482,11 @@ func (j *jobStatusGetter) ServeHTTP(w http.ResponseWriter, r *http.Request) { metrics.Increment("job.get.archived.success") } -func createMetaShutdownJob(ctx context.Context) error { +func createMetaShutdownJob(ctx context.Context, db *newmodels.Queries) error { // This job type might not exist, we need to create it first to avoid // a foreign key error. Just try to create the job every time - we // shouldn't be hitting this endpoint that often - _, err := newmodels.DB.CreateJob(ctx, newmodels.CreateJobParams{ + _, err := db.CreateJob(ctx, newmodels.CreateJobParams{ Name: "meta.shutdown", DeliveryStrategy: newmodels.DeliveryStrategyAtMostOnce, Attempts: 1, @@ -493,6 +503,7 @@ func createMetaShutdownJob(ctx context.Context) error { // jobEnqueuer satisfies the Handler interface. type jobEnqueuer struct { + db *newmodels.Queries useMetaShutdown bool } @@ -553,9 +564,11 @@ func (j *jobEnqueuer) ServeHTTP(w http.ResponseWriter, r *http.Request) { } name := jobIdRoute.FindStringSubmatch(r.URL.Path)[1] if name == "meta.shutdown" { - createMetaShutdownJob(r.Context()) + createMetaShutdownJob(r.Context(), j.db) } - queuedJob, err := queued_jobs.Enqueue(newmodels.EnqueueJobParams{ + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + queuedJob, err := services.Enqueue(ctx, j.db, newmodels.EnqueueJobParams{ ID: id, Name: name, RunAfter: ejr.RunAfter.Time, ExpiresAt: ejr.ExpiresAt, Data: ejr.Data, }) @@ -584,7 +597,7 @@ func (j *jobEnqueuer) ServeHTTP(w http.ResponseWriter, r *http.Request) { } case *pq.Error: if terr.Code == "23505" { // unique violation - queuedJob, err = queued_jobs.Get(r.Context(), id) + queuedJob, err = j.db.GetQueuedJob(r.Context(), id) if err != nil { rest.ServerError(w, r, err) return @@ -611,7 +624,7 @@ func (j *jobEnqueuer) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // GET /v1/archived-jobs -func listArchivedJobs() http.Handler { +func listArchivedJobs(db *newmodels.Queries) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() typ := query.Get("name") @@ -645,19 +658,19 @@ func listArchivedJobs() http.Handler { var err error switch { case typ == "" && status == "": - ajs, err = newmodels.DB.ListArchivedJobs(r.Context(), int32(limit)) + ajs, err = db.ListArchivedJobs(r.Context(), int32(limit)) case status == "" && typ != "": - ajs, err = newmodels.DB.ListArchivedJobsByName(r.Context(), newmodels.ListArchivedJobsByNameParams{ + ajs, err = db.ListArchivedJobsByName(r.Context(), newmodels.ListArchivedJobsByNameParams{ Limit: int32(limit), Name: typ, }) case status != "" && typ == "": - ajs, err = newmodels.DB.ListArchivedJobsByStatus(r.Context(), newmodels.ListArchivedJobsByStatusParams{ + ajs, err = db.ListArchivedJobsByStatus(r.Context(), newmodels.ListArchivedJobsByStatusParams{ Limit: int32(limit), Status: newmodels.ArchivedJobStatus(status), }) case status != "" && typ != "": - ajs, err = newmodels.DB.ListArchivedJobsByNameStatus(r.Context(), newmodels.ListArchivedJobsByNameStatusParams{ + ajs, err = db.ListArchivedJobsByNameStatus(r.Context(), newmodels.ListArchivedJobsByNameStatusParams{ Limit: int32(limit), Name: typ, Status: newmodels.ArchivedJobStatus(status), diff --git a/server/v2.go b/server/v2.go new file mode 100644 index 0000000..e8c4b86 --- /dev/null +++ b/server/v2.go @@ -0,0 +1,31 @@ +package server + +import ( + "net/http" + "regexp" + + "github.com/kevinburke/rest" + "github.com/kevinburke/rickover/dbtohttp" + "github.com/kevinburke/rickover/newmodels" +) + +func V2(db *newmodels.Queries) http.Handler { + h := new(RegexpHandler) + // auth is handled external to this function + h.Handler(regexp.MustCompile(`^/v2/job-types$`), []string{"GET"}, v2GetJobTypes(db)) + return h +} + +// GET /v2/job-types +// +// List all job types. +func v2GetJobTypes(db *newmodels.Queries) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + jobs, err := db.GetAllJobs(r.Context()) + if err != nil { + rest.ServerError(w, r, err) + return + } + respond(Logger, w, r, dbtohttp.Jobs(jobs)) + } +} diff --git a/services/enqueue.go b/services/enqueue.go new file mode 100644 index 0000000..7ae125b --- /dev/null +++ b/services/enqueue.go @@ -0,0 +1,28 @@ +package services + +import ( + "context" + "database/sql" + "fmt" + + "github.com/kevinburke/rickover/models/queued_jobs" + "github.com/kevinburke/rickover/newmodels" +) + +// Enqueue creates a new queued job with the given ID and fields. A +// sql.ErrNoRows will be returned if the `name` does not exist in the jobs +// table. Otherwise the QueuedJob will be returned. +func Enqueue(ctx context.Context, db *newmodels.Queries, params newmodels.EnqueueJobParams) (newmodels.QueuedJob, error) { + qj, err := db.EnqueueJob(ctx, params) + if err != nil { + if err == sql.ErrNoRows { + e := &queued_jobs.UnknownOrArchivedError{ + Err: fmt.Sprintf("Job type %s does not exist or the job with that id has already been archived", params.Name), + } + return newmodels.QueuedJob{}, e + } + return newmodels.QueuedJob{}, err + } + qj.ID.Prefix = queued_jobs.Prefix + return qj, err +} diff --git a/test/server/server_test.go b/test/server/server_test.go index f4c95af..c44e14c 100644 --- a/test/server/server_test.go +++ b/test/server/server_test.go @@ -209,8 +209,9 @@ func TestReplayQueuedJobFails(t *testing.T) { } func Test202SuccessfulEnqueue(t *testing.T) { - defer test.TearDown(t) - _ = factory.CreateJob(t, factory.SampleJob) + test.SetUp(t) + t.Cleanup(func() { test.TearDown(t) }) + factory.CreateJob(t, factory.SampleJob) expiry := time.Now().UTC().Add(5 * time.Minute) w := httptest.NewRecorder() @@ -221,7 +222,7 @@ func Test202SuccessfulEnqueue(t *testing.T) { b := new(bytes.Buffer) json.NewEncoder(b).Encode(ejr) - req, _ := http.NewRequest("PUT", "/v1/jobs/echo/job_6740b44e-13b9-475d-af06-979627e0e0d6", b) + req := httptest.NewRequest("PUT", "/v1/jobs/echo/job_6740b44e-13b9-475d-af06-979627e0e0d6", b) req.SetBasicAuth("test", testPassword) server.DefaultServer.ServeHTTP(w, req) test.AssertEquals(t, w.Code, http.StatusAccepted)