Skip to content

Commit

Permalink
Move processing queue to group context
Browse files Browse the repository at this point in the history
since each group can have its own processing limits,
and if we block the executor with pending tasks for one group, other group won't be able to get processed, while actually it could
  • Loading branch information
marko-bekhta committed Oct 17, 2024
1 parent 83de4ec commit d1576db
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 68 deletions.
35 changes: 27 additions & 8 deletions src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,24 @@ interface JiraProjectGroup {
}

interface EventProcessing {

/**
* Define how many events can be acknowledged and put on the pending queue
* before acknowledging an event results in blocking the response and waiting
* for the queue to free some space.
*/
@WithDefault("10000")
int queueSize();

/**
* Define the number of threads to use when processing queued events.
* <p>
* Note, having a lot of processing threads might not bring much benefit as
* processing may also be limited by {@link JiraConfig.EventProcessing}
*/
@WithDefault("2")
int threads();

/**
* Defines how many events can be processed within the
* {@link #timeframeInSeconds() timeframe}
Expand Down Expand Up @@ -127,20 +145,20 @@ interface Scheduled {

interface JiraProject {
/**
* Downstream project id (not a project key!).
* Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
* Downstream project id (not a project key!). Use
* {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
*/
String projectId();

/**
* Downstream project key.
* Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
* Downstream project key. Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get
* the info.
*/
String projectKey();

/**
* Upstream project key.
* Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
* Upstream project key. Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get
* the info.
*/
String originalProjectKey();

Expand All @@ -154,8 +172,9 @@ interface WebHookSecurity {
/**
* Whether to enable signature verification.
* <p>
* Jira web hooks can send a {@code x-hub-signature} header with a signature of a request body.
* This signature can be then verified using the secret used to configure the web hook.
* Jira web hooks can send a {@code x-hub-signature} header with a signature of
* a request body. This signature can be then verified using the secret used to
* configure the web hook.
*/
@WithDefault("false")
boolean enabled();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,12 @@ public String toString() {
public void close() {
projectGroupContext.close();
}

public int pendingEventsInCurrentContext() {
return projectGroupContext.pendingEventsInCurrentContext();
}

public void submitTask(Runnable runnable) {
projectGroupContext.submitTask(runnable);
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
package org.hibernate.infra.replicate.jira.service.jira;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.hibernate.infra.replicate.jira.JiraConfig;

import io.quarkus.logging.Log;

public final class HandlerProjectGroupContext implements AutoCloseable {

private final ExecutorService eventHandlingExecutor;
private final Supplier<Integer> workQueueSize;
private final ScheduledExecutorService rateLimiterExecutor = Executors.newScheduledThreadPool(1);
private final Semaphore rateLimiter;
private final JiraConfig.JiraProjectGroup projectGroup;

public HandlerProjectGroupContext(JiraConfig.JiraProjectGroup projectGroup) {
this.projectGroup = projectGroup;

final int permits = projectGroup.processing().eventsPerTimeframe();
JiraConfig.EventProcessing processing = projectGroup.processing();

final int permits = processing.eventsPerTimeframe();
this.rateLimiter = new Semaphore(permits);
rateLimiterExecutor.scheduleAtFixedRate(() -> {
rateLimiter.drainPermits();
rateLimiter.release(permits);
}, projectGroup.processing().timeframeInSeconds(), projectGroup.processing().timeframeInSeconds(),
TimeUnit.SECONDS);
}, processing.timeframeInSeconds(), processing.timeframeInSeconds(), TimeUnit.SECONDS);

LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<>(processing.queueSize());
workQueueSize = workQueue::size;
eventHandlingExecutor = new ThreadPoolExecutor(processing.threads(), processing.threads(), 0L,
TimeUnit.MILLISECONDS, workQueue);
}

public void startProcessingEvent() throws InterruptedException {
Expand All @@ -33,12 +47,30 @@ public JiraConfig.JiraProjectGroup projectGroup() {
return projectGroup;
}

public int pendingEventsInCurrentContext() {
return workQueueSize.get();
}

public void submitTask(Runnable task) {
eventHandlingExecutor.submit(task);
}

@Override
public void close() {
// when requesting to close the context we aren't expecting to process any other
// events hence there's no point in continuing "releasing" more "permits":
if (!rateLimiterExecutor.isShutdown()) {
rateLimiterExecutor.shutdownNow();
}
if (!eventHandlingExecutor.isShutdown()) {
try {
eventHandlingExecutor.shutdown();
if (!eventHandlingExecutor.awaitTermination(2, TimeUnit.MINUTES)) {
Log.warnf("Not all events were processed before the shutdown");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,9 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import org.hibernate.infra.replicate.jira.JiraConfig;
import org.hibernate.infra.replicate.jira.ProcessingConfig;
import org.hibernate.infra.replicate.jira.service.jira.client.JiraRestClient;
import org.hibernate.infra.replicate.jira.service.jira.client.JiraRestClientBuilder;
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookEvent;
Expand Down Expand Up @@ -47,19 +41,12 @@
public class JiraService {

private final ReportingConfig reportingConfig;
private final ExecutorService executor;
private final Supplier<Integer> workQueueSize;
private final Map<String, HandlerProjectContext> contextPerProject;
private final JiraConfig jiraConfig;
private final Scheduler scheduler;

@Inject
public JiraService(ProcessingConfig processingConfig, JiraConfig jiraConfig, ReportingConfig reportingConfig,
Scheduler scheduler) {
LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<>(processingConfig.queueSize());
workQueueSize = workQueue::size;
executor = new ThreadPoolExecutor(processingConfig.threads(), processingConfig.threads(), 0L,
TimeUnit.MILLISECONDS, workQueue);
public JiraService(JiraConfig jiraConfig, ReportingConfig reportingConfig, Scheduler scheduler) {

Map<String, HandlerProjectContext> contextMap = new HashMap<>();
for (var entry : jiraConfig.projectGroup().entrySet()) {
Expand Down Expand Up @@ -158,7 +145,7 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
throw new IllegalArgumentException("Unknown project '%s'".formatted(project));
}

executor.submit(() -> {
context.submitTask(() -> {
for (String issueKey : issueKeys) {
triggerSyncEvent(context.sourceJiraClient().getIssue(issueKey), context);
}
Expand All @@ -176,7 +163,7 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
throw new IllegalArgumentException("Unknown project '%s'".formatted(project));
}

executor.submit(() -> syncByQuery(query, context));
context.submitTask(() -> syncByQuery(query, context));
rc.end();
});
mi.router().post("/sync/comments/list").consumes(MediaType.APPLICATION_JSON).blockingHandler(rc -> {
Expand Down Expand Up @@ -223,7 +210,7 @@ public void acknowledge(String project, JiraWebHookEvent event) {
}

for (Runnable handler : eventType.handlers(reportingConfig, event, context)) {
executor.submit(handler);
context.submitTask(handler);
}
}, () -> Log.infof("Event type %s is not supported and cannot be handled.", event.webhookEvent));
}
Expand Down Expand Up @@ -254,20 +241,12 @@ public void syncLastUpdated(String projectGroup) {

@PreDestroy
public void finishProcessingAndShutdown() {
try {
executor.shutdown();
if (!executor.awaitTermination(2, TimeUnit.MINUTES)) {
Log.infof("Not all events were processed before the shutdown");
for (HandlerProjectContext context : contextPerProject.values()) {
try {
context.close();
} catch (Exception e) {
Log.errorf(e, "Error closing context %s: %s", context, e.getMessage());
}
for (HandlerProjectContext context : contextPerProject.values()) {
try {
context.close();
} catch (Exception e) {
Log.errorf(e, "Error closing context %s: %s", context, e.getMessage());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

Expand All @@ -285,7 +264,8 @@ private void syncByQuery(String query, HandlerProjectContext context) {

private void triggerSyncEvent(JiraIssue jiraIssue, HandlerProjectContext context) {
Log.infof("Adding sync events for a jira issue: %s; Already queued events: %s", jiraIssue.key,
workQueueSize.get());
context.pendingEventsInCurrentContext());

JiraWebHookIssue issue = new JiraWebHookIssue();
issue.id = jiraIssue.id;
issue.key = jiraIssue.key;
Expand Down
5 changes: 2 additions & 3 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ jira.project-group."hibernate".destination.login-kind=BEARER_TOKEN
jira.project-group."hibernate".destination.api-user.email=${JIRA_API_USER_REDHAT}
jira.project-group."hibernate".destination.api-user.token=${JIRA_API_TOKEN_REDHAT}
jira.project-group."hibernate".can-set-reporter=true
# Processing queue configuration:
jira.project-group."hibernate".processing.queue-size=${PROCESSING_QUEUE_SIZE:10000}
#
# Management endpoints:
quarkus.management.enabled=true
Expand All @@ -29,9 +31,6 @@ quarkus.security.users.embedded.enabled=true
quarkus.security.users.embedded.plain-text=true
quarkus.security.users.embedded.users."management-user"=${MANAGEMENT_USER_PASSWORD}
#
# Processing queue configuration:
processing.events.queue-size=${PROCESSING_QUEUE_SIZE:1000}
#
# Scheduler:
# >> By default, the scheduler is not started unless a @Scheduled business method is found.
# >> You may need to force the start of the scheduler for "pure" programmatic scheduling via quarkus.scheduler.start-mode=forced
Expand Down

0 comments on commit d1576db

Please sign in to comment.