From d1576db510060e0a80a7a13044c5e0d8d7535ee9 Mon Sep 17 00:00:00 2001 From: marko-bekhta Date: Thu, 17 Oct 2024 15:53:23 +0200 Subject: [PATCH] Move processing queue to group context since each group can have its own processing limits, and if we block the executor with pending tasks for one group, other group won't be able to get processed, while actually it could --- .../infra/replicate/jira/JiraConfig.java | 35 ++++++++++++---- .../replicate/jira/ProcessingConfig.java | 23 ---------- .../service/jira/HandlerProjectContext.java | 8 ++++ .../jira/HandlerProjectGroupContext.java | 38 +++++++++++++++-- .../jira/service/jira/JiraService.java | 42 +++++-------------- src/main/resources/application.properties | 5 +-- 6 files changed, 83 insertions(+), 68 deletions(-) delete mode 100644 src/main/java/org/hibernate/infra/replicate/jira/ProcessingConfig.java diff --git a/src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java b/src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java index bb0a1ff..de0bda3 100644 --- a/src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java +++ b/src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java @@ -94,6 +94,24 @@ interface JiraProjectGroup { } interface EventProcessing { + + /** + * Define how many events can be acknowledged and put on the pending queue + * before acknowledging an event results in blocking the response and waiting + * for the queue to free some space. + */ + @WithDefault("10000") + int queueSize(); + + /** + * Define the number of threads to use when processing queued events. + *

+ * Note, having a lot of processing threads might not bring much benefit as + * processing may also be limited by {@link JiraConfig.EventProcessing} + */ + @WithDefault("2") + int threads(); + /** * Defines how many events can be processed within the * {@link #timeframeInSeconds() timeframe} @@ -127,20 +145,20 @@ interface Scheduled { interface JiraProject { /** - * Downstream project id (not a project key!). - * Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info. + * Downstream project id (not a project key!). Use + * {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info. */ String projectId(); /** - * Downstream project key. - * Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info. + * Downstream project key. Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get + * the info. */ String projectKey(); /** - * Upstream project key. - * Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info. + * Upstream project key. Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get + * the info. */ String originalProjectKey(); @@ -154,8 +172,9 @@ interface WebHookSecurity { /** * Whether to enable signature verification. *

- * Jira web hooks can send a {@code x-hub-signature} header with a signature of a request body. - * This signature can be then verified using the secret used to configure the web hook. + * Jira web hooks can send a {@code x-hub-signature} header with a signature of + * a request body. This signature can be then verified using the secret used to + * configure the web hook. */ @WithDefault("false") boolean enabled(); diff --git a/src/main/java/org/hibernate/infra/replicate/jira/ProcessingConfig.java b/src/main/java/org/hibernate/infra/replicate/jira/ProcessingConfig.java deleted file mode 100644 index f29b7f5..0000000 --- a/src/main/java/org/hibernate/infra/replicate/jira/ProcessingConfig.java +++ /dev/null @@ -1,23 +0,0 @@ -package org.hibernate.infra.replicate.jira; - -import io.smallrye.config.ConfigMapping; -import io.smallrye.config.WithDefault; - -@ConfigMapping(prefix = "processing.events") -public interface ProcessingConfig { - /** - * Define how many events can be acknowledged and put on the pending queue before - * acknowledging an event results in blocking the response and waiting for the queue to free some space. - */ - @WithDefault("10000") - int queueSize(); - - /** - * Define the number of threads to use when processing queued events. - *

- * Note, having a lot of processing threads might not bring much benefit as processing - * may also be limited by {@link JiraConfig.EventProcessing} - */ - @WithDefault("2") - int threads(); -} diff --git a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectContext.java b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectContext.java index bb603a1..e9d1bef 100644 --- a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectContext.java +++ b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectContext.java @@ -183,4 +183,12 @@ public String toString() { public void close() { projectGroupContext.close(); } + + public int pendingEventsInCurrentContext() { + return projectGroupContext.pendingEventsInCurrentContext(); + } + + public void submitTask(Runnable runnable) { + projectGroupContext.submitTask(runnable); + } } diff --git a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectGroupContext.java b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectGroupContext.java index 3f78495..5c85030 100644 --- a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectGroupContext.java +++ b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectGroupContext.java @@ -1,14 +1,22 @@ package org.hibernate.infra.replicate.jira.service.jira; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.hibernate.infra.replicate.jira.JiraConfig; +import io.quarkus.logging.Log; + public final class HandlerProjectGroupContext implements AutoCloseable { + private final ExecutorService eventHandlingExecutor; + private final Supplier workQueueSize; private final ScheduledExecutorService rateLimiterExecutor = Executors.newScheduledThreadPool(1); private final Semaphore rateLimiter; private final JiraConfig.JiraProjectGroup projectGroup; @@ -16,13 +24,19 @@ public final class HandlerProjectGroupContext implements AutoCloseable { public HandlerProjectGroupContext(JiraConfig.JiraProjectGroup projectGroup) { this.projectGroup = projectGroup; - final int permits = projectGroup.processing().eventsPerTimeframe(); + JiraConfig.EventProcessing processing = projectGroup.processing(); + + final int permits = processing.eventsPerTimeframe(); this.rateLimiter = new Semaphore(permits); rateLimiterExecutor.scheduleAtFixedRate(() -> { rateLimiter.drainPermits(); rateLimiter.release(permits); - }, projectGroup.processing().timeframeInSeconds(), projectGroup.processing().timeframeInSeconds(), - TimeUnit.SECONDS); + }, processing.timeframeInSeconds(), processing.timeframeInSeconds(), TimeUnit.SECONDS); + + LinkedBlockingDeque workQueue = new LinkedBlockingDeque<>(processing.queueSize()); + workQueueSize = workQueue::size; + eventHandlingExecutor = new ThreadPoolExecutor(processing.threads(), processing.threads(), 0L, + TimeUnit.MILLISECONDS, workQueue); } public void startProcessingEvent() throws InterruptedException { @@ -33,6 +47,14 @@ public JiraConfig.JiraProjectGroup projectGroup() { return projectGroup; } + public int pendingEventsInCurrentContext() { + return workQueueSize.get(); + } + + public void submitTask(Runnable task) { + eventHandlingExecutor.submit(task); + } + @Override public void close() { // when requesting to close the context we aren't expecting to process any other @@ -40,5 +62,15 @@ public void close() { if (!rateLimiterExecutor.isShutdown()) { rateLimiterExecutor.shutdownNow(); } + if (!eventHandlingExecutor.isShutdown()) { + try { + eventHandlingExecutor.shutdown(); + if (!eventHandlingExecutor.awaitTermination(2, TimeUnit.MINUTES)) { + Log.warnf("Not all events were processed before the shutdown"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } } diff --git a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/JiraService.java b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/JiraService.java index 0c8db5b..ae15f0c 100644 --- a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/JiraService.java +++ b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/JiraService.java @@ -7,15 +7,9 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; import org.hibernate.infra.replicate.jira.JiraConfig; -import org.hibernate.infra.replicate.jira.ProcessingConfig; import org.hibernate.infra.replicate.jira.service.jira.client.JiraRestClient; import org.hibernate.infra.replicate.jira.service.jira.client.JiraRestClientBuilder; import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookEvent; @@ -47,19 +41,12 @@ public class JiraService { private final ReportingConfig reportingConfig; - private final ExecutorService executor; - private final Supplier workQueueSize; private final Map contextPerProject; private final JiraConfig jiraConfig; private final Scheduler scheduler; @Inject - public JiraService(ProcessingConfig processingConfig, JiraConfig jiraConfig, ReportingConfig reportingConfig, - Scheduler scheduler) { - LinkedBlockingDeque workQueue = new LinkedBlockingDeque<>(processingConfig.queueSize()); - workQueueSize = workQueue::size; - executor = new ThreadPoolExecutor(processingConfig.threads(), processingConfig.threads(), 0L, - TimeUnit.MILLISECONDS, workQueue); + public JiraService(JiraConfig jiraConfig, ReportingConfig reportingConfig, Scheduler scheduler) { Map contextMap = new HashMap<>(); for (var entry : jiraConfig.projectGroup().entrySet()) { @@ -158,7 +145,7 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) { throw new IllegalArgumentException("Unknown project '%s'".formatted(project)); } - executor.submit(() -> { + context.submitTask(() -> { for (String issueKey : issueKeys) { triggerSyncEvent(context.sourceJiraClient().getIssue(issueKey), context); } @@ -176,7 +163,7 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) { throw new IllegalArgumentException("Unknown project '%s'".formatted(project)); } - executor.submit(() -> syncByQuery(query, context)); + context.submitTask(() -> syncByQuery(query, context)); rc.end(); }); mi.router().post("/sync/comments/list").consumes(MediaType.APPLICATION_JSON).blockingHandler(rc -> { @@ -223,7 +210,7 @@ public void acknowledge(String project, JiraWebHookEvent event) { } for (Runnable handler : eventType.handlers(reportingConfig, event, context)) { - executor.submit(handler); + context.submitTask(handler); } }, () -> Log.infof("Event type %s is not supported and cannot be handled.", event.webhookEvent)); } @@ -254,20 +241,12 @@ public void syncLastUpdated(String projectGroup) { @PreDestroy public void finishProcessingAndShutdown() { - try { - executor.shutdown(); - if (!executor.awaitTermination(2, TimeUnit.MINUTES)) { - Log.infof("Not all events were processed before the shutdown"); + for (HandlerProjectContext context : contextPerProject.values()) { + try { + context.close(); + } catch (Exception e) { + Log.errorf(e, "Error closing context %s: %s", context, e.getMessage()); } - for (HandlerProjectContext context : contextPerProject.values()) { - try { - context.close(); - } catch (Exception e) { - Log.errorf(e, "Error closing context %s: %s", context, e.getMessage()); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); } } @@ -285,7 +264,8 @@ private void syncByQuery(String query, HandlerProjectContext context) { private void triggerSyncEvent(JiraIssue jiraIssue, HandlerProjectContext context) { Log.infof("Adding sync events for a jira issue: %s; Already queued events: %s", jiraIssue.key, - workQueueSize.get()); + context.pendingEventsInCurrentContext()); + JiraWebHookIssue issue = new JiraWebHookIssue(); issue.id = jiraIssue.id; issue.key = jiraIssue.key; diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 10e05f9..a65de24 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -15,6 +15,8 @@ jira.project-group."hibernate".destination.login-kind=BEARER_TOKEN jira.project-group."hibernate".destination.api-user.email=${JIRA_API_USER_REDHAT} jira.project-group."hibernate".destination.api-user.token=${JIRA_API_TOKEN_REDHAT} jira.project-group."hibernate".can-set-reporter=true +# Processing queue configuration: +jira.project-group."hibernate".processing.queue-size=${PROCESSING_QUEUE_SIZE:10000} # # Management endpoints: quarkus.management.enabled=true @@ -29,9 +31,6 @@ quarkus.security.users.embedded.enabled=true quarkus.security.users.embedded.plain-text=true quarkus.security.users.embedded.users."management-user"=${MANAGEMENT_USER_PASSWORD} # -# Processing queue configuration: -processing.events.queue-size=${PROCESSING_QUEUE_SIZE:1000} -# # Scheduler: # >> By default, the scheduler is not started unless a @Scheduled business method is found. # >> You may need to force the start of the scheduler for "pure" programmatic scheduling via quarkus.scheduler.start-mode=forced