Skip to content

Commit

Permalink
Make small changes related to worker queue
Browse files Browse the repository at this point in the history
  • Loading branch information
matthew-white committed Dec 13, 2023
1 parent 5e1c7c1 commit eb59599
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
2 changes: 1 addition & 1 deletion lib/bin/run-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const server = service.listen(config.get('default.server.port'), () => {


////////////////////////////////////////////////////////////////////////////////
// START WORKER
// START WORKERS

const { workerQueue } = require('../worker/worker');
workerQueue(container).loops(4);
Expand Down
10 changes: 6 additions & 4 deletions lib/worker/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { head } = require('ramda');
const { sql } = require('slonik');
const { timebound, runSequentially } = require('../util/promise');
const defaultJobMap = require('./jobs').jobs;
const { noop } = require('../util/util');

// TODO: domain catch/restart? << investigated and seems unlikely.

Expand Down Expand Up @@ -40,8 +41,8 @@ const workerQueue = (container, jobMap = defaultJobMap) => {

// given an event, attempts to run the appropriate jobs for the event,
// returning `true` immediately if there is a job to run and `false` if not.
// if there is a job, the function will call the `done` callback once all jobs
// have been run, or once there has been an error. the function works hard on
// if there is a job, runJobs() will call the `done` callback once all jobs
// have been run, or once there has been an error. runJobs() works hard on
// error handling, and will attempt to unclaim the event if a failure occurs.
const runJobs = (event, done) => {
if (event == null) return false;
Expand All @@ -58,7 +59,8 @@ const workerQueue = (container, jobMap = defaultJobMap) => {
.then(() => { process.stdout.write(`[${(new Date()).toISOString()}] finish processing event ${logname}\n`); })
.catch((err) => {
report(err);
return run(sql`update audits set claimed=null, failures=${event.failures + 1}, "lastFailure"=clock_timestamp() where id=${event.id}`);
return run(sql`update audits set claimed=null, failures=${event.failures + 1}, "lastFailure"=clock_timestamp() where id=${event.id}`)
.catch(noop);
})
.finally(done);

Expand All @@ -81,7 +83,7 @@ update audits set claimed=clock_timestamp() from q where audits.id=q.id returnin
.then(head);

// main loop. kicks off a check and attempts to process the result of the check.
// if there was something to do, takes a break while that happens; the runner will
// if there was something to do, takes a break while that happens; runJobs() will
// call back into the scheduler when it's done.
// if there was nothing to do, immediately schedules a subsequent check at a capped
// exponential backoff rate.
Expand Down

0 comments on commit eb59599

Please sign in to comment.