Skip to content

Commit

Permalink
server: implement GET /v2/job-types
Browse files Browse the repository at this point in the history
Also start working on passing around a *newmodels.Queries instead of
hardcoding `*newmodels.DB` everywhere.

Add a new `responses` package that can help us get rid of `auto_id`
in the database response, which is something that we don't want to
expose.

Updates #8.
  • Loading branch information
kevinburke committed Feb 23, 2021
1 parent d876623 commit ab945f1
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 46 deletions.
7 changes: 4 additions & 3 deletions db/queries/jobs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ VALUES ($1, $2, $3, $4)
RETURNING *;

-- name: GetJob :one
SELECT *
FROM jobs
SELECT * FROM jobs
WHERE name = $1;

-- name: GetAllJobs :many
SELECT * FROM jobs WHERE name != 'meta.shutdown';
SELECT * FROM jobs
WHERE name != 'meta.shutdown'
ORDER BY auto_id DESC;

-- name: DeleteAllJobs :execrows
DELETE FROM jobs;
24 changes: 24 additions & 0 deletions dbtohttp/dbtohttp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package dbtohttp

import (
"github.com/kevinburke/rickover/newmodels"
"github.com/kevinburke/rickover/responses"
)

func Job(j newmodels.Job) responses.Job {
return responses.Job{
Name: j.Name,
DeliveryStrategy: string(j.DeliveryStrategy),
Attempts: j.Attempts,
Concurrency: j.Concurrency,
CreatedAt: j.CreatedAt,
}
}

func Jobs(js []newmodels.Job) []responses.Job {
resp := make([]responses.Job, len(js))
for i := range js {
resp[i] = Job(js[i])
}
return resp
}
2 changes: 2 additions & 0 deletions models/queued_jobs/queued_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var StuckJobLimit = 100
// Enqueue creates a new queued job with the given ID and fields. A
// sql.ErrNoRows will be returned if the `name` does not exist in the jobs
// table. Otherwise the QueuedJob will be returned.
//
// Deprecated: use services.Enqueue instead.
func Enqueue(params newmodels.EnqueueJobParams) (*newmodels.QueuedJob, error) {
qj, err := newmodels.DB.EnqueueJob(context.TODO(), params)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions newmodels/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@ import (
"github.com/kevinburke/rickover/models/db"
)

var DB *Queries
// DefaultServer relies on this being initialized (and not overwritten)
var DB = new(Queries)

func Setup(ctx context.Context) error {
if !db.Connected() {
return errors.New("newmodels: no database connection, bailing")
}

var err error
DB, err = Prepare(ctx, db.Conn)
return err
qs, err := Prepare(ctx, db.Conn)
if err != nil {
return err
}
*DB = *qs
return nil
}
11 changes: 11 additions & 0 deletions responses/responses.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package responses

import "time"

type Job struct {
Name string `json:"name"`
DeliveryStrategy string `json:"delivery_strategy"`
Attempts int16 `json:"attempts"`
Concurrency int16 `json:"concurrency"`
CreatedAt time.Time `json:"created_at"`
}
12 changes: 11 additions & 1 deletion server/enqueue_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func Test401UnknownPassword(t *testing.T) {

func Test400NoBody(t *testing.T) {
t.Parallel()
test.SetUp(t)
defer test.TearDown(t)
w := httptest.NewRecorder()
ejr := &EnqueueJobRequest{
Data: empty,
Expand All @@ -84,6 +86,8 @@ func Test400NoBody(t *testing.T) {

func Test400EmptyBody(t *testing.T) {
t.Parallel()
test.SetUp(t)
defer test.TearDown(t)
w := httptest.NewRecorder()
var v interface{}
err := json.Unmarshal([]byte("{}"), &v)
Expand All @@ -92,7 +96,7 @@ func Test400EmptyBody(t *testing.T) {
json.NewEncoder(b).Encode(v)
ssa, server := newSSAServer()
ssa.AddUser("test", "password")
req, _ := http.NewRequest("PUT", "/v1/jobs/echo/job_f17373a6-2cd7-4010-afba-eebc6dc6f9ab", b)
req := httptest.NewRequest("PUT", "/v1/jobs/echo/job_f17373a6-2cd7-4010-afba-eebc6dc6f9ab", b)
req.SetBasicAuth("test", "password")
server.ServeHTTP(w, req)
test.AssertEquals(t, w.Code, http.StatusBadRequest)
Expand All @@ -106,6 +110,8 @@ func Test400EmptyBody(t *testing.T) {

func Test400InvalidUUID(t *testing.T) {
t.Parallel()
test.SetUp(t)
defer test.TearDown(t)
w := httptest.NewRecorder()
ejr := &EnqueueJobRequest{
Data: empty,
Expand All @@ -125,6 +131,8 @@ func Test400InvalidUUID(t *testing.T) {

func Test400WrongPrefix(t *testing.T) {
t.Parallel()
test.SetUp(t)
defer test.TearDown(t)
w := httptest.NewRecorder()
ejr := &EnqueueJobRequest{
Data: empty,
Expand All @@ -139,6 +147,8 @@ func Test400WrongPrefix(t *testing.T) {

func Test413TooLargeJSON(t *testing.T) {
t.Parallel()
test.SetUp(t)
defer test.TearDown(t)
w := httptest.NewRecorder()
// 4 bytes per record - the value and the quotes around it.
var bigarr [100 * 256]string
Expand Down
4 changes: 4 additions & 0 deletions server/regex_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (h *RegexpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
for _, route := range h.routes {
if route.pattern.MatchString(r.URL.Path) {
upperMethod := strings.ToUpper(r.Method)
if route.methods == nil && upperMethod != "OPTIONS" {
route.handler.ServeHTTP(w, r)
return
}
for _, method := range route.methods {
if strings.ToUpper(method) == upperMethod {
route.handler.ServeHTTP(w, r)
Expand Down
83 changes: 48 additions & 35 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"github.com/kevinburke/rest"
"github.com/kevinburke/rest/resterror"
"github.com/kevinburke/rickover/config"
"github.com/kevinburke/rickover/dbtohttp"
"github.com/kevinburke/rickover/metrics"
"github.com/kevinburke/rickover/models/archived_jobs"
"github.com/kevinburke/rickover/models/db"
"github.com/kevinburke/rickover/models/jobs"
"github.com/kevinburke/rickover/models/queued_jobs"
"github.com/kevinburke/rickover/newmodels"
"github.com/kevinburke/rickover/services"
"github.com/kevinburke/rickover/setup"
"github.com/lib/pq"
)
Expand Down Expand Up @@ -93,28 +95,34 @@ func New(ctx context.Context, cfg Config) (http.Handler, error) {
if err := setup.DB(ctx, cfg.Connector, cfg.NumConns); err != nil {
return nil, err
}
s := Get(cfg)
return s, nil
return Get(cfg), nil
}

// Get returns a http.Handler with all routes initialized using the given
// Authorizer.
// Authorizer. Get assumes that setup.DB has been called; to call both use
// New.
func Get(c Config) http.Handler {
a := c.Auth
if a == nil {
panic("server: cannot call Get() with nil Authorizer")
}
h := new(RegexpHandler)

db := newmodels.DB
useMetaShutdown := !c.DisableMetaShutdown
h.Handler(jobsRoute, []string{"POST"}, authHandler(createJob(useMetaShutdown), a))
h.Handler(getJobRoute, []string{"GET"}, authHandler(handleJobRoute(useMetaShutdown), a))
h.Handler(getJobTypeRoute, []string{"GET"}, authHandler(getJobType(), a))

// v1
h.Handler(jobsRoute, []string{"POST"}, authHandler(createJob(db, useMetaShutdown), a))
h.Handler(getJobRoute, []string{"GET"}, authHandler(handleJobRoute(db, useMetaShutdown), a))
h.Handler(getJobTypeRoute, []string{"GET"}, authHandler(getJobType(db), a))

h.Handler(replayRoute, []string{"POST"}, authHandler(replayHandler(), a))

h.Handler(jobIdRoute, []string{"GET", "POST", "PUT"}, authHandler(handleJobRoute(useMetaShutdown), a))
h.Handler(archivedJobsRoute, []string{"GET"}, authHandler(listArchivedJobs(), a))
h.Handler(jobIdRoute, []string{"GET", "POST", "PUT"}, authHandler(handleJobRoute(db, useMetaShutdown), a))
h.Handler(archivedJobsRoute, []string{"GET"}, authHandler(listArchivedJobs(db), a))

// v2
h.Handler(regexp.MustCompile(`^/v2/(.+)$`), nil, authHandler(V2(db), a))

h.Handler(regexp.MustCompile("^/debug/pprof$"), []string{"GET"}, authHandler(http.HandlerFunc(pprof.Index), a))
h.Handler(regexp.MustCompile("^/debug/pprof/cmdline$"), []string{"GET"}, authHandler(http.HandlerFunc(pprof.Cmdline), a))
Expand Down Expand Up @@ -145,7 +153,7 @@ func serverHeaderHandler(h http.Handler) http.Handler {
} else {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
}
w.Header().Set("Server", fmt.Sprintf("rickover/%s", config.Version))
w.Header().Set("Server", "rickover/"+config.Version)
h.ServeHTTP(w, r)
})
}
Expand Down Expand Up @@ -235,10 +243,10 @@ type CreateJobRequest struct {
// GET /v1/jobs/:jobName
//
// Get a job type by name. Returns a models.Job or an error
func getJobType() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
func getJobType(db *newmodels.Queries) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
jobName := getJobTypeRoute.FindStringSubmatch(r.URL.Path)[1]
job, err := jobs.Get(r.Context(), jobName)
job, err := db.GetJob(r.Context(), jobName)
if err != nil {
if err == sql.ErrNoRows {
notFound(w, new404(r))
Expand All @@ -247,8 +255,8 @@ func getJobType() http.Handler {
rest.ServerError(w, r, err)
return
}
respond(Logger, w, r, job)
})
respond(Logger, w, r, dbtohttp.Job(job))
}
}

var Logger = handlers.Logger
Expand All @@ -257,8 +265,8 @@ var Logger = handlers.Logger
//
// createJob returns a http.HandlerFunc that responds to job creation requests
// using the given authorizer interface.
func createJob(useMetaShutdown bool) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
func createJob(db *newmodels.Queries, useMetaShutdown bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Body == nil {
rest.BadRequest(w, r, createEmptyErr("name", r.URL.Path))
return
Expand Down Expand Up @@ -326,7 +334,9 @@ func createJob(useMetaShutdown bool) http.Handler {
Attempts: jr.Attempts,
}
start := time.Now()
job, err := jobs.Create(newmodels.CreateJobParams{
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
job, err := db.CreateJob(ctx, newmodels.CreateJobParams{
Name: jobData.Name,
DeliveryStrategy: jobData.DeliveryStrategy,
Concurrency: jobData.Concurrency,
Expand Down Expand Up @@ -354,12 +364,12 @@ func createJob(useMetaShutdown bool) http.Handler {
// a shutdown job, which will instruct the dequeuer to shut down.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := createMetaShutdownJob(ctx); err != nil {
if err := createMetaShutdownJob(ctx, db); err != nil {
return
}
id := types.GenerateUUID("job_")
runAfter := time.Now().Add(-1 * 5 * time.Second)
queued_jobs.Enqueue(newmodels.EnqueueJobParams{
services.Enqueue(ctx, db, newmodels.EnqueueJobParams{
ID: id, Name: "meta.shutdown", RunAfter: runAfter,
ExpiresAt: types.NullTime{
Valid: true,
Expand All @@ -369,9 +379,9 @@ func createJob(useMetaShutdown bool) http.Handler {
})
}()
}
created(Logger, w, r, job)
created(Logger, w, r, dbtohttp.Job(job))
metrics.Increment("type.create.success")
})
}
}

// An EnqueueJobRequest is sent in the body of a request to PUT
Expand All @@ -388,15 +398,15 @@ type EnqueueJobRequest struct {
}

// GET/POST/PUT disambiguator for /v1/jobs/:name/:id
func handleJobRoute(useMetaShutdown bool) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
func handleJobRoute(db *newmodels.Queries, useMetaShutdown bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
j := jobStatusUpdater{}
j.ServeHTTP(w, r)
return
}
if r.Method == "PUT" {
j := jobEnqueuer{useMetaShutdown: useMetaShutdown}
j := jobEnqueuer{db: db, useMetaShutdown: useMetaShutdown}
j.ServeHTTP(w, r)
return
}
Expand All @@ -406,7 +416,7 @@ func handleJobRoute(useMetaShutdown bool) http.HandlerFunc {
return
}
rest.NotAllowed(w, r)
})
}
}

type jobStatusGetter struct{}
Expand Down Expand Up @@ -472,11 +482,11 @@ func (j *jobStatusGetter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
metrics.Increment("job.get.archived.success")
}

func createMetaShutdownJob(ctx context.Context) error {
func createMetaShutdownJob(ctx context.Context, db *newmodels.Queries) error {
// This job type might not exist, we need to create it first to avoid
// a foreign key error. Just try to create the job every time - we
// shouldn't be hitting this endpoint that often
_, err := newmodels.DB.CreateJob(ctx, newmodels.CreateJobParams{
_, err := db.CreateJob(ctx, newmodels.CreateJobParams{
Name: "meta.shutdown",
DeliveryStrategy: newmodels.DeliveryStrategyAtMostOnce,
Attempts: 1,
Expand All @@ -493,6 +503,7 @@ func createMetaShutdownJob(ctx context.Context) error {

// jobEnqueuer satisfies the Handler interface.
type jobEnqueuer struct {
db *newmodels.Queries
useMetaShutdown bool
}

Expand Down Expand Up @@ -553,9 +564,11 @@ func (j *jobEnqueuer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
name := jobIdRoute.FindStringSubmatch(r.URL.Path)[1]
if name == "meta.shutdown" {
createMetaShutdownJob(r.Context())
createMetaShutdownJob(r.Context(), j.db)
}
queuedJob, err := queued_jobs.Enqueue(newmodels.EnqueueJobParams{
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
queuedJob, err := services.Enqueue(ctx, j.db, newmodels.EnqueueJobParams{
ID: id, Name: name, RunAfter: ejr.RunAfter.Time,
ExpiresAt: ejr.ExpiresAt, Data: ejr.Data,
})
Expand Down Expand Up @@ -584,7 +597,7 @@ func (j *jobEnqueuer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
case *pq.Error:
if terr.Code == "23505" { // unique violation
queuedJob, err = queued_jobs.Get(r.Context(), id)
queuedJob, err = j.db.GetQueuedJob(r.Context(), id)
if err != nil {
rest.ServerError(w, r, err)
return
Expand All @@ -611,7 +624,7 @@ func (j *jobEnqueuer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// GET /v1/archived-jobs
func listArchivedJobs() http.Handler {
func listArchivedJobs(db *newmodels.Queries) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
typ := query.Get("name")
Expand Down Expand Up @@ -645,19 +658,19 @@ func listArchivedJobs() http.Handler {
var err error
switch {
case typ == "" && status == "":
ajs, err = newmodels.DB.ListArchivedJobs(r.Context(), int32(limit))
ajs, err = db.ListArchivedJobs(r.Context(), int32(limit))
case status == "" && typ != "":
ajs, err = newmodels.DB.ListArchivedJobsByName(r.Context(), newmodels.ListArchivedJobsByNameParams{
ajs, err = db.ListArchivedJobsByName(r.Context(), newmodels.ListArchivedJobsByNameParams{
Limit: int32(limit),
Name: typ,
})
case status != "" && typ == "":
ajs, err = newmodels.DB.ListArchivedJobsByStatus(r.Context(), newmodels.ListArchivedJobsByStatusParams{
ajs, err = db.ListArchivedJobsByStatus(r.Context(), newmodels.ListArchivedJobsByStatusParams{
Limit: int32(limit),
Status: newmodels.ArchivedJobStatus(status),
})
case status != "" && typ != "":
ajs, err = newmodels.DB.ListArchivedJobsByNameStatus(r.Context(), newmodels.ListArchivedJobsByNameStatusParams{
ajs, err = db.ListArchivedJobsByNameStatus(r.Context(), newmodels.ListArchivedJobsByNameStatusParams{
Limit: int32(limit),
Name: typ,
Status: newmodels.ArchivedJobStatus(status),
Expand Down
Loading

0 comments on commit ab945f1

Please sign in to comment.