Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat (Core): Migrate import contentlets action to job processor #30432

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ce92a41
#24498 Add RetryPolicyProcessor for handling retry policies
jgambarios Oct 18, 2024
d40a0db
#24498 Refactor file retrieval to JobUtil class
jgambarios Oct 18, 2024
0e9d92d
Merge remote-tracking branch 'origin/main' into issue-29498-migrate-I…
jgambarios Oct 22, 2024
17b28a8
#29498 Add progress callback and finalize ImportContentletProcessor
jgambarios Oct 22, 2024
36936c9
Merge remote-tracking branch 'origin/main' into issue-29498-migrate-I…
jgambarios Oct 22, 2024
0183221
Merge remote-tracking branch 'origin/main' into issue-29498-migrate-I…
jgambarios Oct 23, 2024
e999e20
#29498 Rename ImportContentletProcessor to ImportContentletsProcessor
jgambarios Oct 23, 2024
ff5a516
#29498 Create NoRetryPolicy and DefaultRetryStrategy annotations
jgambarios Oct 23, 2024
dc929f5
#29498 Add job watcher filtering and predicates in RealTimeJobMonitor
jgambarios Oct 23, 2024
ccf4819
Merge remote-tracking branch 'origin/main' into issue-29498-migrate-I…
jgambarios Oct 23, 2024
5518ef3
#29498 Sonarlint feedback.
jgambarios Oct 23, 2024
b9019cd
#29498 Add private constructors and default constructor for CDI
jgambarios Oct 23, 2024
2f605a7
Merge remote-tracking branch 'origin/main' into issue-29498-migrate-I…
jgambarios Oct 23, 2024
a757bb0
#29498 Fixing tests
jgambarios Oct 23, 2024
7aa91f9
#29498 Fixing tests
jgambarios Oct 24, 2024
6460115
Merge remote-tracking branch 'origin/main' into issue-29498-migrate-I…
jgambarios Oct 24, 2024
542414d
#29498 Fix type casting issue in `getFields` method.
jgambarios Oct 24, 2024
834d256
#29498 Refactor job management API and implement endpoint
jgambarios Oct 25, 2024
9e43009
#29498 Rename FailJob to FailSuccessJob and improve job handling
jgambarios Oct 26, 2024
f9373a0
Merge remote-tracking branch 'origin/main' into issue-29498-migrate-I…
jgambarios Oct 26, 2024
054f97d
#29498 Add integration test and refactor request generation
jgambarios Oct 29, 2024
d07d375
Merge remote-tracking branch 'origin/main' into issue-29498-migrate-I…
jgambarios Oct 29, 2024
fb7a415
Merge remote-tracking branch 'origin/main' into issue-29498-migrate-I…
jgambarios Oct 29, 2024
218c0bc
Solving conflicts
jgambarios Oct 29, 2024
0d13629
#29498 Enable Weld JUnit5 in integration test
jgambarios Oct 29, 2024
6dd17d7
Merge remote-tracking branch 'origin/main' into issue-29498-migrate-I…
jgambarios Oct 29, 2024
d5e7f95
#29498 Applying code review feedback
jgambarios Oct 29, 2024
e601cea
#29498 Applying code review feedback
jgambarios Oct 29, 2024
32a6ca9
Merge remote-tracking branch 'origin/main' into issue-29498-migrate-I…
jgambarios Oct 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ public interface JobQueueManagerAPI {

/**
* Retrieves the job processors for all registered queues.
*
* @return A map of queue names to job processors
*/
Map<String,Class<? extends JobProcessor>> getQueueNames();
Map<String, Class<? extends JobProcessor>> getQueueNames();

/**
* Creates a new job in the specified queue.
Expand All @@ -86,6 +87,18 @@ String createJob(String queueName, Map<String, Object> parameters)
*/
Job getJob(String jobId) throws DotDataException;

/**
* Retrieves a list of active jobs for a specific queue.
*
* @param queueName The name of the queue
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of active jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
throws JobQueueDataException;

/**
* Retrieves a list of jobs.
*
Expand All @@ -97,21 +110,42 @@ String createJob(String queueName, Map<String, Object> parameters)
JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException;

/**
* Retrieves a list of active jobs for a specific queue.
* @param queueName The name of the queue
* @param page The page number
* @param pageSize The number of jobs per page
* Retrieves a list of active jobs, meaning jobs that are currently being processed.
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of active jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) throws JobQueueDataException;
JobPaginatedResult getActiveJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of completed jobs for a specific queue within a date range.
* Retrieves a list of completed jobs
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of completed jobs and pagination information.
* @throws JobQueueDataException
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getCompletedJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of canceled jobs
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of canceled jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getCanceledJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of failed jobs
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of failed jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getFailedJobs(int page, int pageSize) throws JobQueueDataException;

Expand Down Expand Up @@ -141,6 +175,7 @@ String createJob(String queueName, Map<String, Object> parameters)

/**
* Retrieves the retry strategy for a specific queue.
*
* @param jobId The ID of the job
* @return The processor instance, or an empty optional if not found
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
import com.dotcms.jobs.business.error.CircuitBreaker;
import com.dotcms.jobs.business.error.ErrorDetail;
import com.dotcms.jobs.business.error.JobProcessorNotFoundException;
import com.dotcms.jobs.business.error.RetryPolicyProcessor;
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobPaginatedResult;
import com.dotcms.jobs.business.job.JobResult;
import com.dotcms.jobs.business.job.JobState;
import com.dotcms.jobs.business.processor.Cancellable;
import com.dotcms.jobs.business.processor.DefaultProgressTracker;
import com.dotcms.jobs.business.processor.DefaultRetryStrategy;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.ProgressTracker;
import com.dotcms.jobs.business.queue.JobQueue;
Expand Down Expand Up @@ -107,6 +109,7 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI {
private ExecutorService executorService;
private final Map<String, RetryStrategy> retryStrategies;
private final RetryStrategy defaultRetryStrategy;
private final RetryPolicyProcessor retryPolicyProcessor;

private final ScheduledExecutorService pollJobUpdatesScheduler;
private LocalDateTime lastPollJobUpdateTime = LocalDateTime.now();
Expand Down Expand Up @@ -142,10 +145,11 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI {
public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue,
JobQueueConfig jobQueueConfig,
CircuitBreaker circuitBreaker,
RetryStrategy defaultRetryStrategy,
@DefaultRetryStrategy RetryStrategy defaultRetryStrategy,
RealTimeJobMonitor realTimeJobMonitor,
EventProducer eventProducer,
JobProcessorFactory jobProcessorFactory) {
JobProcessorFactory jobProcessorFactory,
RetryPolicyProcessor retryPolicyProcessor) {

this.jobQueue = jobQueue;
this.threadPoolSize = jobQueueConfig.getThreadPoolSize();
Expand All @@ -154,6 +158,8 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue,
this.retryStrategies = new ConcurrentHashMap<>();
this.defaultRetryStrategy = defaultRetryStrategy;
this.circuitBreaker = circuitBreaker;
this.jobProcessorFactory = jobProcessorFactory;
this.retryPolicyProcessor = retryPolicyProcessor;

this.pollJobUpdatesScheduler = Executors.newSingleThreadScheduledExecutor();
pollJobUpdatesScheduler.scheduleAtFixedRate(
Expand All @@ -164,7 +170,6 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue,
// Events
this.realTimeJobMonitor = realTimeJobMonitor;
this.eventProducer = eventProducer;
this.jobProcessorFactory = jobProcessorFactory;
}

@Override
Expand Down Expand Up @@ -244,6 +249,12 @@ public void registerProcessor(final String queueName, final Class<? extends JobP
jobProcessor.getName(), queueName));
}
processors.put(queueName, processor);

// Process the retry policy for the processor
RetryStrategy retryStrategy = retryPolicyProcessor.processRetryPolicy(processor);
if (retryStrategy != null) {
setRetryStrategy(queueName, retryStrategy);
}
}

@Override
Expand Down Expand Up @@ -290,6 +301,17 @@ public Job getJob(final String jobId) throws DotDataException {
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getActiveJobs(queueName, page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getJobs(final int page, final int pageSize) throws DotDataException {
Expand All @@ -302,23 +324,45 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) throws Dot

@CloseDBIfOpened
@Override
public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
public JobPaginatedResult getActiveJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getActiveJobs(queueName, page, pageSize);
return jobQueue.getActiveJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getCompletedJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getCompletedJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching completed jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getCanceledJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getCanceledJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching canceled jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getFailedJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getFailedJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
throw new JobQueueDataException("Error fetching failed jobs", e);
}
}

Expand Down Expand Up @@ -683,15 +727,14 @@ private void processJob(final Job job) throws DotDataException {
"Error processing job " + runningJob.id() + ": " + e.getMessage(), e
);
handleJobFailure(
runningJob, processor, e, e.getMessage(), "Job execution"
runningJob, processor, e, "Job execution"
);
}
} else {

Logger.error(this, "No processor found for queue: " + job.queueName());
handleJobFailure(job, null, new JobProcessorNotFoundException(job.queueName()),
"No processor found for queue", "Processor selection"
);
"Processor selection");
}
}

Expand Down Expand Up @@ -815,6 +858,29 @@ private void handleJobCancellation(final Job job, final JobProcessor processor)
);
}

/**
* Handles the failure of a job
*
* @param job The job that failed.
* @param processor The processor that handled the job.
* @param exception The exception that caused the failure.
* @param processingStage The stage of processing where the failure occurred.
*/
@WrapInTransaction
private void handleJobFailure(final Job job, final JobProcessor processor,
final Exception exception, final String processingStage) throws DotDataException {

var jobFailureException = exception;
if (exception.getCause() != null) {
jobFailureException = (Exception) exception.getCause();
}

handleJobFailure(
job, processor, jobFailureException, jobFailureException.getMessage(),
processingStage
);
}

/**
* Handles the failure of a job
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.dotcms.jobs.business.api.events;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.immutables.value.Value;

/**
* Class to hold a watcher and its filter predicate.
*/
@Value.Style(typeImmutable = "*", typeAbstract = "Abstract*")
@Value.Immutable
@JsonSerialize(as = JobWatcher.class)
@JsonDeserialize(as = JobWatcher.class)
public interface AbstractJobWatcher {

/**
* Returns a Consumer that performs an operation on a Job instance.
*
* @return a Consumer of Job that defines what to do with a Job instance.
*/
Consumer<com.dotcms.jobs.business.job.Job> watcher();

/**
* Returns a predicate that can be used to filter jobs based on custom criteria.
*
* @return a Predicate object to filter Job instances
*/
Predicate<com.dotcms.jobs.business.job.Job> filter();

}
Loading