Skip to content

Commit

Permalink
#29498 Add job watcher filtering and predicates in RealTimeJobMonitor
Browse files Browse the repository at this point in the history
Introduced a new abstract class `AbstractJobWatcher` and modified `RealTimeJobMonitor` to support job watcher filtering using predicates. Updated related test configurations to include the new classes for initialization. These changes enhance the monitoring functionality by allowing more precise control over job update notifications.
  • Loading branch information
jgambarios committed Oct 23, 2024
1 parent ff5a516 commit dc929f5
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.dotcms.jobs.business.api.events;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.immutables.value.Value;

/**
* Class to hold a watcher and its filter predicate.
*/
@Value.Style(typeImmutable = "*", typeAbstract = "Abstract*")
@Value.Immutable
@JsonSerialize(as = JobWatcher.class)
@JsonDeserialize(as = JobWatcher.class)
public interface AbstractJobWatcher {

/**
* Returns a Consumer that performs an operation on a Job instance.
*
* @return a Consumer of Job that defines what to do with a Job instance.
*/
Consumer<com.dotcms.jobs.business.job.Job> watcher();

/**
* Returns a predicate that can be used to filter jobs based on custom criteria.
*
* @return a Predicate object to filter Job instances
*/
Predicate<com.dotcms.jobs.business.job.Job> filter();

}
Original file line number Diff line number Diff line change
@@ -1,47 +1,132 @@
package com.dotcms.jobs.business.api.events;

import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobState;
import com.dotmarketing.util.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;

/**
* Manages real-time monitoring of jobs in the system. This class handles registration of job
* watchers, updates watchers on job changes, and processes various job-related events.
* Manages real-time monitoring of jobs in the system. This class provides functionality to register
* watchers for specific jobs and receive notifications about job state changes and progress updates.
*
* <p>Thread safety is ensured through a combination of {@link ConcurrentHashMap} for storing watchers
* and synchronized {@link List}s for managing multiple watchers per job. This allows concurrent
* registration and notification of watchers without compromising data consistency.</p>
*
* <p>The monitor supports filtered watching through predicates, allowing clients to receive only
* the updates they're interested in. Common predicates are provided through the inner
* {@link Predicates} class.</p>
*
* <h2>Usage Examples:</h2>
*
* <p>Watch all job updates:</p>
* <pre>{@code
* monitor.registerWatcher(jobId, job -> System.out.println("Job updated: " + job.id()));
* }</pre>
*
* <p>Watch only completed jobs:</p>
* <pre>{@code
* monitor.registerWatcher(jobId,
* job -> handleCompletion(job),
* Predicates.isCompleted()
* );
* }</pre>
*
* <p>Watch progress changes with threshold:</p>
* <pre>{@code
* monitor.registerWatcher(jobId,
* job -> updateProgress(job),
* Predicates.progressChanged(0.1f) // Updates every 10% progress
* );
* }</pre>
*
* <p>Combine multiple conditions:</p>
* <pre>{@code
* monitor.registerWatcher(jobId,
* job -> handleUpdate(job),
* Predicates.hasState(JobState.RUNNING)
* .and(Predicates.progressChanged(0.05f))
* );
* }</pre>
*
* @see JobWatcher
* @see Predicates
*/
@ApplicationScoped
public class RealTimeJobMonitor {

private final Map<String, List<Consumer<Job>>> jobWatchers = new ConcurrentHashMap<>();
private final Map<String, List<JobWatcher>> jobWatchers = new ConcurrentHashMap<>();

/**
* Registers a watcher for a specific job.
* Registers a watcher for a specific job with optional filtering of updates. The watcher will
* be notified of job updates that match the provided filter predicate. If no filter is provided
* (null), the watcher receives all updates for the job.
*
* @param jobId The ID of the job to watch.
* @param watcher The consumer to be notified of job updates.
* <p>Multiple watchers can be registered for the same job, and each watcher can have
* its own filter predicate. Watchers are automatically removed when a job reaches a final state
* (completed, cancelled, or removed).</p>
*
* @param jobId The ID of the job to watch
* @param watcher The consumer to be notified of job updates
* @param filter Optional predicate to filter job updates (null means receive all updates)
* @throws IllegalArgumentException if jobId or watcher is null
* @see Predicates for common filter predicates
*/
public void registerWatcher(String jobId, Consumer<Job> watcher, Predicate<Job> filter) {
jobWatchers.compute(jobId, (key, existingWatchers) -> {
List<JobWatcher> watchers = Objects.requireNonNullElseGet(
existingWatchers,
() -> Collections.synchronizedList(new ArrayList<>())
);

final var jobWatcher = JobWatcher.builder()
.watcher(watcher)
.filter(filter != null ? filter : job -> true).build();

watchers.add(jobWatcher);
return watchers;
});
}

/**
* Registers a watcher for a specific job that receives all updates.
* This is a convenience method equivalent to calling {@code registerWatcher(jobId, watcher, null)}.
*
* @param jobId The ID of the job to watch
* @param watcher The consumer to be notified of job updates
* @throws IllegalArgumentException if jobId or watcher is null
*/
public void registerWatcher(String jobId, Consumer<Job> watcher) {
jobWatchers.computeIfAbsent(jobId, k -> new CopyOnWriteArrayList<>()).add(watcher);
registerWatcher(jobId, watcher, null);
}

/**
* Retrieves the set of job IDs currently being watched.
* The returned set is a snapshot and may not reflect concurrent modifications.
*
* @return A set of job IDs.
* @return An unmodifiable set of job IDs with active watchers
*/
public Set<String> getWatchedJobIds() {
return jobWatchers.keySet();
}

/**
* Updates watchers for a list of jobs.
* Each job's watchers are notified according to their filter predicates.
*
* @param updatedJobs List of jobs that have been updated.
* @param updatedJobs List of jobs that have been updated
* @throws IllegalArgumentException if updatedJobs is null
*/
public void updateWatchers(List<Job> updatedJobs) {
for (Job job : updatedJobs) {
Expand All @@ -56,9 +141,18 @@ public void updateWatchers(List<Job> updatedJobs) {
*/
private void updateWatchers(Job job) {

List<Consumer<Job>> watchers = jobWatchers.get(job.id());
List<JobWatcher> watchers = jobWatchers.get(job.id());
if (watchers != null) {
watchers.forEach(watcher -> watcher.accept(job));
watchers.forEach(jobWatcher -> {
try {
if (jobWatcher.filter().test(job)) {
jobWatcher.watcher().accept(job);
}
} catch (Exception e) {
Logger.error(this, "Error notifying job watcher for job " + job.id(), e);
watchers.remove(jobWatcher);
}
});
}
}

Expand Down Expand Up @@ -136,4 +230,81 @@ public void onJobProgressUpdated(@Observes JobProgressUpdatedEvent event) {
updateWatchers(event.getJob());
}

/**
* Common predicates for filtering job updates. These predicates can be used individually or
* combined using {@link Predicate#and(Predicate)} and {@link Predicate#or(Predicate)} to create
* more complex filtering conditions.
*/
public static class Predicates {

/**
* Creates a predicate that matches jobs with any of the specified states.
*
* @param states One or more job states to match
* @return A predicate that returns true if the job's state matches any of the specified
* states
* @throws IllegalArgumentException if states is null or empty
*/
public static Predicate<Job> hasState(JobState... states) {
return job -> Arrays.asList(states).contains(job.state());
}

/**
* Creates a predicate that matches jobs whose progress has changed by at least the
* specified threshold since the last notification.
*
* @param threshold The minimum progress change (0.0 to 1.0) required to match
* @return A predicate that tracks and matches significant progress changes
* @throws IllegalArgumentException if threshold is not between 0.0 and 1.0
*/
public static Predicate<Job> progressChanged(float threshold) {
return new Predicate<>() {
private float lastProgress = 0;

@Override
public boolean test(Job job) {
float currentProgress = job.progress();
if (Math.abs(currentProgress - lastProgress) >= threshold) {
lastProgress = currentProgress;
return true;
}
return false;
}
};
}

/**
* Creates a predicate that matches failed jobs with error details. The predicate only
* matches if the job is in FAILED state and has error details available.
*
* @return A predicate for matching failed jobs
*/
public static Predicate<Job> hasFailed() {
return job -> job.state() == JobState.FAILED
&& job.result().isPresent()
&& job.result().get().errorDetail().isPresent();
}

/**
* Creates a predicate that matches completed jobs. The predicate matches any job in the
* COMPLETED state.
*
* @return A predicate for matching completed jobs
*/
public static Predicate<Job> isCompleted() {
return job -> job.state() == JobState.COMPLETED;
}

/**
* Creates a predicate that matches canceled jobs. The predicate matches any job in the
* CANCELED state.
*
* @return A predicate for matching canceled jobs
*/
public static Predicate<Job> isCanceled() {
return job -> job.state() == JobState.CANCELED;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.dotcms.jobs.business.api.events.EventProducer;
import com.dotcms.jobs.business.api.events.RealTimeJobMonitor;
import com.dotcms.jobs.business.error.CircuitBreaker;
import com.dotcms.jobs.business.error.NoRetryStrategy;
import com.dotcms.jobs.business.error.RetryPolicyProcessor;
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.error.RetryStrategyProducer;
import com.dotcms.jobs.business.queue.JobQueue;
Expand All @@ -32,7 +34,7 @@ public class TestBaseJunit5WeldInitiator {
JobQueueProducer.class, JobQueueConfigProducer.class,
RetryStrategyProducer.class, RealTimeJobMonitor.class,
EventProducer.class, JobProcessorFactory.class, JobQueueHelper.class,
JobProcessorScanner.class
JobProcessorScanner.class, RetryPolicyProcessor.class, NoRetryStrategy.class
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.dotcms.jobs.business.api.events.EventProducer;
import com.dotcms.jobs.business.api.events.RealTimeJobMonitor;
import com.dotcms.jobs.business.error.CircuitBreaker;
import com.dotcms.jobs.business.error.NoRetryStrategy;
import com.dotcms.jobs.business.error.RetryPolicyProcessor;
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.error.RetryStrategyProducer;
import com.dotcms.jobs.business.queue.JobQueue;
Expand Down Expand Up @@ -76,7 +78,9 @@ public void init() throws Exception {
JobProcessorFactory.class,
EventProducer.class,
JobProcessorScanner.class,
JobQueueHelper.class)
JobQueueHelper.class,
RetryPolicyProcessor.class,
NoRetryStrategy.class)
.initialize();

System.setProperty(TestUtil.DOTCMS_INTEGRATION_TEST, TestUtil.DOTCMS_INTEGRATION_TEST);
Expand Down

0 comments on commit dc929f5

Please sign in to comment.