Skip to content

Commit

Permalink
#29498 Refactor job management API and implement endpoint
Browse files Browse the repository at this point in the history
Refactor and enhance job management functionality including type-safe parameters handling, retrieval of active, completed, canceled, and failed jobs using consolidated queries. Added new endpoint for job creation with parameters and updated related tests.
  • Loading branch information
jgambarios committed Oct 25, 2024
1 parent 542414d commit 834d256
Show file tree
Hide file tree
Showing 9 changed files with 537 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ public interface JobQueueManagerAPI {

/**
* Retrieves the job processors for all registered queues.
*
* @return A map of queue names to job processors
*/
Map<String,Class<? extends JobProcessor>> getQueueNames();
Map<String, Class<? extends JobProcessor>> getQueueNames();

/**
* Creates a new job in the specified queue.
Expand All @@ -86,6 +87,18 @@ String createJob(String queueName, Map<String, Object> parameters)
*/
Job getJob(String jobId) throws DotDataException;

/**
* Retrieves a list of active jobs for a specific queue.
*
* @param queueName The name of the queue
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of active jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
throws JobQueueDataException;

/**
* Retrieves a list of jobs.
*
Expand All @@ -97,21 +110,42 @@ String createJob(String queueName, Map<String, Object> parameters)
JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException;

/**
* Retrieves a list of active jobs for a specific queue.
* @param queueName The name of the queue
* @param page The page number
* @param pageSize The number of jobs per page
* Retrieves a list of active jobs, meaning jobs that are currently being processed.
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of active jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) throws JobQueueDataException;
JobPaginatedResult getActiveJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of completed jobs for a specific queue within a date range.
* Retrieves a list of completed jobs
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of completed jobs and pagination information.
* @throws JobQueueDataException
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getCompletedJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of canceled jobs
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of canceled jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getCanceledJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of failed jobs
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of failed jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getFailedJobs(int page, int pageSize) throws JobQueueDataException;

Expand Down Expand Up @@ -141,6 +175,7 @@ String createJob(String queueName, Map<String, Object> parameters)

/**
* Retrieves the retry strategy for a specific queue.
*
* @param jobId The ID of the job
* @return The processor instance, or an empty optional if not found
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,17 @@ public Job getJob(final String jobId) throws DotDataException {
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getActiveJobs(queueName, page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getJobs(final int page, final int pageSize) throws DotDataException {
Expand All @@ -312,23 +323,45 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) throws Dot

@CloseDBIfOpened
@Override
public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
public JobPaginatedResult getActiveJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getActiveJobs(queueName, page, pageSize);
return jobQueue.getActiveJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getCompletedJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getCompletedJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching completed jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getCanceledJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getCanceledJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching canceled jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getFailedJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getFailedJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
throw new JobQueueDataException("Error fetching failed jobs", e);
}
}

Expand Down
30 changes: 30 additions & 0 deletions dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,36 @@ JobPaginatedResult getCompletedJobs(String queueName, LocalDateTime startDate,
*/
JobPaginatedResult getJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of active jobs, meaning jobs that are currently being processed.
*
* @param page The page number (for pagination).
* @param pageSize The number of items per page.
* @return A result object containing the list of active jobs and pagination information.
* @throws JobQueueDataException if there's a data storage error while fetching the jobs
*/
JobPaginatedResult getActiveJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of completed jobs.
*
* @param page The page number (for pagination).
* @param pageSize The number of items per page.
* @return A result object containing the list of completed jobs and pagination information.
* @throws JobQueueDataException if there's a data storage error while fetching the jobs
*/
JobPaginatedResult getCompletedJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of canceled jobs.
*
* @param page The page number (for pagination).
* @param pageSize The number of items per page.
* @return A result object containing the list of canceled jobs and pagination information.
* @throws JobQueueDataException if there's a data storage error while fetching the jobs
*/
JobPaginatedResult getCanceledJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of failed jobs.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class PostgresJobQueue implements JobQueue {
+ "ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED) "
+ "RETURNING *";

private static final String GET_ACTIVE_JOBS_QUERY =
private static final String GET_ACTIVE_JOBS_QUERY_FOR_QUEUE =
"WITH total AS (SELECT COUNT(*) AS total_count " +
" FROM job WHERE queue_name = ? AND state IN (?, ?) " +
"), " +
Expand All @@ -85,7 +85,7 @@ public class PostgresJobQueue implements JobQueue {
") " +
"SELECT p.*, t.total_count FROM total t LEFT JOIN paginated_data p ON true";

private static final String GET_COMPLETED_JOBS_QUERY =
private static final String GET_COMPLETED_JOBS_QUERY_FOR_QUEUE =
"WITH total AS (SELECT COUNT(*) AS total_count " +
" FROM job WHERE queue_name = ? AND state = ? AND completed_at BETWEEN ? AND ? " +
"), " +
Expand All @@ -95,15 +95,15 @@ public class PostgresJobQueue implements JobQueue {
") " +
"SELECT p.*, t.total_count FROM total t LEFT JOIN paginated_data p ON true";

private static final String GET_FAILED_JOBS_QUERY =
private static final String GET_JOBS_QUERY_BY_STATE =
"WITH total AS (" +
" SELECT COUNT(*) AS total_count FROM job " +
" WHERE state = ? " +
" WHERE state IN $??$ " +
"), " +
"paginated_data AS (" +
" SELECT * " +
" FROM job WHERE state = ? " +
" ORDER BY updated_at DESC " +
" FROM job WHERE state IN $??$ " +
" ORDER BY $ORDER_BY$ DESC " +
" LIMIT ? OFFSET ? " +
") " +
"SELECT p.*, t.total_count " +
Expand Down Expand Up @@ -152,6 +152,9 @@ public class PostgresJobQueue implements JobQueue {

private static final String COLUMN_TOTAL_COUNT = "total_count";

private static final String REPLACE_TOKEN_PARAMETERS = "$??$";
private static final String REPLACE_TOKEN_ORDER_BY = "$ORDER_BY$";

/**
* Jackson mapper configuration and lazy initialized instance.
*/
Expand Down Expand Up @@ -246,7 +249,7 @@ public JobPaginatedResult getActiveJobs(final String queueName, final int page,
try {

DotConnect dc = new DotConnect();
dc.setSQL(GET_ACTIVE_JOBS_QUERY);
dc.setSQL(GET_ACTIVE_JOBS_QUERY_FOR_QUEUE);
dc.addParam(queueName);
dc.addParam(JobState.PENDING.name());
dc.addParam(JobState.RUNNING.name());
Expand All @@ -258,8 +261,10 @@ public JobPaginatedResult getActiveJobs(final String queueName, final int page,

return jobPaginatedResult(page, pageSize, dc);
} catch (DotDataException e) {
Logger.error(this, "Database error while fetching active jobs", e);
throw new JobQueueDataException("Database error while fetching active jobs", e);
Logger.error(this,
"Database error while fetching active jobs by queue", e);
throw new JobQueueDataException(
"Database error while fetching active jobs by queue", e);
}
}

Expand All @@ -271,7 +276,7 @@ public JobPaginatedResult getCompletedJobs(final String queueName,

try {
DotConnect dc = new DotConnect();
dc.setSQL(GET_COMPLETED_JOBS_QUERY);
dc.setSQL(GET_COMPLETED_JOBS_QUERY_FOR_QUEUE);
dc.addParam(queueName);
dc.addParam(JobState.COMPLETED.name());
dc.addParam(Timestamp.valueOf(startDate));
Expand All @@ -285,8 +290,10 @@ public JobPaginatedResult getCompletedJobs(final String queueName,

return jobPaginatedResult(page, pageSize, dc);
} catch (DotDataException e) {
Logger.error(this, "Database error while fetching completed jobs", e);
throw new JobQueueDataException("Database error while fetching completed jobs", e);
Logger.error(this,
"Database error while fetching completed jobs by queue", e);
throw new JobQueueDataException(
"Database error while fetching completed jobs by queue", e);
}
}

Expand All @@ -307,13 +314,92 @@ public JobPaginatedResult getJobs(final int page, final int pageSize)
}
}

@Override
public JobPaginatedResult getActiveJobs(final int page, final int pageSize)
throws JobQueueDataException {

try {

var query = GET_JOBS_QUERY_BY_STATE
.replace(REPLACE_TOKEN_PARAMETERS, "(?, ?)")
.replace(REPLACE_TOKEN_ORDER_BY, "created_at");

DotConnect dc = new DotConnect();
dc.setSQL(query);
dc.addParam(JobState.PENDING.name());
dc.addParam(JobState.RUNNING.name());
dc.addParam(JobState.PENDING.name()); // Repeated for paginated_data CTE
dc.addParam(JobState.RUNNING.name());
dc.addParam(pageSize);
dc.addParam((page - 1) * pageSize);

return jobPaginatedResult(page, pageSize, dc);
} catch (DotDataException e) {
Logger.error(this, "Database error while fetching active jobs", e);
throw new JobQueueDataException("Database error while fetching active jobs", e);
}
}

@Override
public JobPaginatedResult getCompletedJobs(final int page, final int pageSize)
throws JobQueueDataException {

try {

var query = GET_JOBS_QUERY_BY_STATE
.replace(REPLACE_TOKEN_PARAMETERS, "(?)")
.replace(REPLACE_TOKEN_ORDER_BY, "completed_at");

DotConnect dc = new DotConnect();
dc.setSQL(query);
dc.addParam(JobState.COMPLETED.name());
dc.addParam(JobState.COMPLETED.name()); // Repeated for paginated_data CTE
dc.addParam(pageSize);
dc.addParam((page - 1) * pageSize);

return jobPaginatedResult(page, pageSize, dc);
} catch (DotDataException e) {
Logger.error(this, "Database error while fetching completed jobs", e);
throw new JobQueueDataException("Database error while fetching completed jobs", e);
}
}

@Override
public JobPaginatedResult getCanceledJobs(final int page, final int pageSize)
throws JobQueueDataException {

try {

var query = GET_JOBS_QUERY_BY_STATE
.replace(REPLACE_TOKEN_PARAMETERS, "(?)")
.replace(REPLACE_TOKEN_ORDER_BY, "completed_at");

DotConnect dc = new DotConnect();
dc.setSQL(query);
dc.addParam(JobState.CANCELED.name());
dc.addParam(JobState.CANCELED.name()); // Repeated for paginated_data CTE
dc.addParam(pageSize);
dc.addParam((page - 1) * pageSize);

return jobPaginatedResult(page, pageSize, dc);
} catch (DotDataException e) {
Logger.error(this, "Database error while fetching cancelled jobs", e);
throw new JobQueueDataException("Database error while fetching cancelled jobs", e);
}
}

@Override
public JobPaginatedResult getFailedJobs(final int page, final int pageSize)
throws JobQueueDataException {

try {

var query = GET_JOBS_QUERY_BY_STATE
.replace(REPLACE_TOKEN_PARAMETERS, "(?)")
.replace(REPLACE_TOKEN_ORDER_BY, "updated_at");

DotConnect dc = new DotConnect();
dc.setSQL(GET_FAILED_JOBS_QUERY);
dc.setSQL(query);
dc.addParam(JobState.FAILED.name());
dc.addParam(JobState.FAILED.name()); // Repeated for paginated_data CTE
dc.addParam(pageSize);
Expand Down
Loading

0 comments on commit 834d256

Please sign in to comment.