Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance db-scheduler to support asynchronous task execution #304

Open
wants to merge 4 commits into
base: async
Choose a base branch
from

Conversation

amit-handda
Copy link

@amit-handda amit-handda commented Jul 7, 2022

Thank you for maintaining db-scheduler.

What

This PR enhances db-scheduler to support task execution asynchronously. It addresses this issue

Why

db-scheduler has an executorservice of threadpool. Threadpools are used to schedule db-scheduler tasks.
Tasks were executed in a synchronous way. If tasks do something (http call eg) asynchronously, the threadpool will remain blocked.
This might not be an issue where tasks are tiny or the task throughput is tiny. But, it can become an issue for some scenarios.

Current status

  • Few TCs are flaky due to asychronous implementation. looking into fixing it.

@kagkarlsson
Copy link
Owner

Thank you! I am currently on vacation and will not be able to look at it for a couple of weeks. And it is also a critical change so I need some time to consider it

@x0a1b
Copy link

x0a1b commented Aug 1, 2022

I see some significant performance improvements as well. @amit-handda can you post some benchmarks as well?

@amit-handda
Copy link
Author

As expected, we have observed signficant throughput gains for async tasks, as follows:

  Current (execution/s) Future-based (execution/s)
1 scheduler instance    
300 threads, 10k tasks 285 1,666
4 scheduler instances    
20 threads, 10k tasks 80 2,206
100 threads, 10k tasks 359 2,407

Task def
image

@amit-handda
Copy link
Author

Hello @kagkarlsson, hope you had nice vacation. Wondering if you are back and had the time to look at the PR ? Let me know if you have any query. TY !

@kagkarlsson
Copy link
Owner

Thank you! Yes I am back, but I struggle to get the bandwidth to review this PR unfortunately.

I have scanned through the code and noted a couple of things:

  • I am not sure how I feel about breaking the ExecutionHandler interface. If the async variant could possibly be introduced more gently, as an option rather..
  • Are we allowing more things to happen in parallel? (If so, I am not sure what the consequences of this will be)

@amit-handda
Copy link
Author

Hello @kagkarlsson , thanks for the feedback. ques
1/ how do you intend the ExecutionHandler interface to be ?
The PR is allowing the ExecutionHandler implementation to return a Future so that the db-scheduler thread would be unblocked (in case the implementation performs async ops such as HTTP calls etc.)
Thanks,

@kagkarlsson
Copy link
Owner

1/ how do you intend the ExecutionHandler interface to be ?

Well, that is the problem, I do not know currently. What options do we have 🤔


return completableFuture.whenCompleteAsync((completion, ex) -> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure what happens here. What will the significance of whenCompleteAsync be here?

Will it happen in parallell with executor.removeCurrentlyProcessing here?

executePickedExecution(pickedExecution).whenComplete((c, ex) -> executor.removeCurrentlyProcessing(executionId));

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using whenCompleteAsync to associate the action to the db-scheduler executor explicitly (otherwise, it might use the common forkjoinpool).
I dont think it ll execute in parallel to removeCurrentlyProcessing since *Async API only enables to specify the executor service for the action.
ref link: https://www.linkedin.com/pulse/asynchronous-programming-java-completablefuture-aliaksandr-liakh/
please let me know in case I am off.

@kagkarlsson
Copy link
Owner

As expected, we have observed signficant throughput gains for async tasks, as follows:

  Current (execution/s) Future-based (execution/s)
1 scheduler instance    
300 threads, 10k tasks 285 1,666
4 scheduler instances    
20 threads, 10k tasks 80 2,206
100 threads, 10k tasks 359 2,407

I need to understand why a future-based execution will be that much faster. Feels like we are skipping av step..

@amit-handda
Copy link
Author

amit-handda commented Aug 29, 2022

I need to understand why a future-based execution will be that much faster. Feels like we are skipping av step..

Hi, It is due to our task defn. Task implementation does nothing other than sleeping for a second. After our changes, the sleeping tasks dont block the scheduler threadpool (since task implementation returns futures). Before our changes, same task implementation would block the scheduler threadpool (hence, leading to inferior performance), let me know if you have any issues.
https://user-images.githubusercontent.com/56566242/182731346-cf142ba0-56f5-45ad-a888-69b8c4378aac.png

@amit-handda
Copy link
Author

Well, that is the problem, I do not know currently. What options do we have 🤔

IMO, returning a completablefuture<completionhandler> is the only option to enable task to execute async operations and unblocking the db-scheduler threadpool. <- this necessitates updating the interface in a non-gentle way. :)

@maxpert
Copy link

maxpert commented Sep 1, 2022

I think what @amit-handda is mentioning is more specific for reactive solutions (In this case Kotlin coroutines which are non-blocking on threads). I think having this in place will unlock bunch of reactive usecase (including Spring framework users) to write their workers in more scalable way.

@kagkarlsson
Copy link
Owner

kagkarlsson commented Sep 1, 2022

The results you are referring to, where future-based and non-future based tasks are compared, what polling-strategy are they using, and what limits?

In my mind it should not be pulling in more work before completing a significant part of the last batch. But that will be affected by polling-strategy changes.

@maxpert
Copy link

maxpert commented Sep 2, 2022

The results you are referring to, where future-based and non-future based tasks are compared, what polling-strategy are they using, and what limits?

So the reactive systems that I am referring to usually use stuff like e-poll, kqueue etc. For example if you are using Reactor + Netty there are various system level event driven libraries it can build on, you can read details here. Combined with project reactor, it can get you amazing async DB libraries like R2DBC which means you can talk to DB in async manner now.

In my mind it should not be pulling in more work before completing a significant part of the last batch. But that will be affected by polling-strategy changes.

If the work it's doing is CPU intensive for sure, even regular threads will be blocked. But if the work is more IO intensive then in traditional model you are just spinning up threads and wasting cycles waiting for it to complete. That's where these
Async-IO / Reactor driven frameworks let you fully saturate your CPU. I don't think Future based work load messes with pulling logic.

So goin details a little bit more let's assume as simple job scheduler that sends out emails. All the job does is pull an email from worker job and then makes an HTTP call to say SendGrid or some external service to send email. Now in traditional thread model you won't be able to spin up more than couple of hundred thread, so you can't do processing of more than couple of hundred parallel emails at a time, however the async/reactor models unblock you from that. Since each job turns into a future promise based model, none of the threads have to block and wait until the worker lets go of thread. That's what event loops have done and unlocked at large scale. This PR IMO will exactly do that.

@kagkarlsson
Copy link
Owner

I appreciate the explanation, but I was also concretely asking about what settings where used in the test :)

In my mind it should not be pulling in more work before completing a significant part of the last batch

was referring to my mental model of how the scheduler works

@kagkarlsson
Copy link
Owner

I think the thing is that if the thread-pool is not the bottleneck in terms of how many executions we may have running in parallel, we need to consider how many is safe to allow? After they finish there will be a backlog of executions to complete..

@amit-handda
Copy link
Author

Hi @kagkarlsson, we used lock_and_fetch poll strategy with 0.5 (lower_limit) and 4.0 (upper_limit). please let me know if you need more info. Many thanks for your careful review feedback. Have a good day.

return Scheduler.create(primaryDS, task.make())
      .pollUsingLockAndFetch(lowerLimitFractionOfThreads, upperLimitFractionOfThreads)
      .threads(schConfig.schedulerThreadPool)
      .pollingInterval(Duration.ofSeconds(schConfig.schedulerPollingIntervalSecs.toLong()))
      .schedulerName(SchedulerName.Fixed(schedulerInstanceId))
      .registerShutdownHook()
      .build()

perf test config

db:
  db:
    jdbcUrl: "jdbc:postgresql://localhost:5432/pgloadtest_local"
    username: "pgloadtest_local"
    password: "testpwd"
    maxLifetimeSecs: 300
    dbconnTtlSecs: 300
    maximumPoolSize: 50
pgloadtest:
  dbscheduler:
    inactivityDurationSecs: 3600
    failRedoSecs: 4
    schedulerThreadPool: 300
    schedulerPollingIntervalSecs: 10
  task:
    failRedoSecs: 5
    inactivityDurationSecs: 600

@maxpert
Copy link

maxpert commented Sep 13, 2022

Any updates on if this can be merged? This will really unblock a lot of use-cases.

@kagkarlsson
Copy link
Owner

It is taking some time because it is not a priority feature (though I am starting to see the merits) and there are just still things to consider that I feel are unanswered.

My thoughts atm

  • have you verified your fork works as expected for your use-case?
    • what configuration are you hoping to run?
    • what performance are you hoping to see?
  • unexpectedly high executions/s, why is that, it should do maximum 4xthreads/s but seem to be doing 5-6xthreads/s
    • was the same config for comparing sync vs async?
    • do we have a bug in the polling-logic? it should not pull more work until queue-length reaches 0.5xthreads again
  • does db-scheduler have any assumptions somewhere in the codebase that the execution lifecycle will be executed by the same thread? (I think not, though aync does make things harder to debug)
  • if we go the async route, how would we do stuff like this: Helpful stuck-thread logging #116

Also concerned about unknown unknowns. If it goes in, I think it needs to do so as an experimental feature.

@kagkarlsson
Copy link
Owner

Would it at all be possible to enable async-mode by config-setting?

@amit-handda
Copy link
Author

@kagkarlsson

  • digging deeper into unexpectedly high executions/s, ll get back to you
  • about async-mod via config setting ... not sure about it, as discussed earlier. the PR makes breaking changes to db-scheduler public API. am open to ideas though.

@kagkarlsson
Copy link
Owner

I feel conflicted about this change. It will enable higher throughput using fewer threads (for non-blocking io use-cases), which is great. On the other hand, I worry about the things it might break or make more complicated.
And this is why an explicit opt-in for async-mode would be good..

I was considering if ExecutionHandler could get a default method for the async-variant, that just wraps the current method in a CompletableFuture. You would override the default method when necessary.. Still, we would need to find a way to toggle the completablefuture code in the executors aswell..

@kagkarlsson
Copy link
Owner

Maybe it is possible to let the async version live and be released from a custom branch 🤔

@amit-handda
Copy link
Author

Thanks @kagkarlsson I agree with this, we can let it breathe through custom branch please.

@maxpert
Copy link

maxpert commented Sep 26, 2022

If we can do that and release it a some sort of beta package. I can write some reactor benchmarks as well.

@amit-handda
Copy link
Author

@kagkarlsson happy friday, checking in. May I help getting it released from custom branch ? thanks

@kagkarlsson
Copy link
Owner

Shall we recreate the PR against the async branch I created?

@amit-handda amit-handda changed the base branch from master to async October 17, 2022 23:00
@kagkarlsson
Copy link
Owner

Trying to merge into the async branch now, but I am getting compilation errors. Is this branch building ok?

@GithubRyze
Copy link

I reviewed the past comment about this issue and curious also. we can do like below code to impl non blokc task(like http request). and on the other hand task is async or not it depends on use-case

 public RecurringTaskWithPersistentSchedule<PlainScheduleAndData> asyncRecurringTask() {
    return Tasks
        .recurringWithPersistentSchedule("async-recurring-task", PlainScheduleAndData.class)
        .execute((taskInstance, executionContext) -> {
          CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
              ()->{
                try {
                  // mock http request
                  Thread.sleep(2000L);
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
              return "test";
              }
          );
          orgFuture.whenComplete((a, throwable) -> {
            // call back here

          });
        });
  }

@kagkarlsson
Copy link
Owner

Sorry for not having the bandwidth for pursuing this issue. What about using virtual threads to solve this?

@maxpert
Copy link

maxpert commented Mar 28, 2023

@kagkarlsson virtual thread are in EAP and even after rollout just like record classes it's gonna take years for people to move over. All of existing library and software stack still relies on Future based solutions.

@kagkarlsson
Copy link
Owner

I am experimenting some with this feature in #369. Copied the ideas from this PR, but wanted a way to experiment without affecting existing code and tasks, so making it a slightly more hack-ish add-on for now.
I see you guys have a lot of experience with non-blocking code, feel free to pitch in there @amit-handda @maxpert

#369

See example AsyncOneTimeTaskMain.java

Comment on lines +74 to +75
// Since execution is executed in an async way, we need to wait for a while to let the execution finish before asserting
Thread.sleep(1000);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sleeping in tests is an anti-pattern I am trying to avoid as much as possible :)

@matt-snider
Copy link
Contributor

What about using virtual threads to solve this?

@kagkarlsson Virtual threads would probably be much much simpler to support for users who can use it (starting from Java 19 with --enable-preview). This might just work without issue, but I haven't tried it.

Scheduler.create(...)
  // ... 
  .executorService(Executors.newVirtualThreadPerTaskExecutor())
  .build();

If I get a chance to experiment with this I will report back

@ml-deb
Copy link

ml-deb commented Dec 18, 2023

What about using virtual threads to solve this?

@kagkarlsson Virtual threads would probably be much much simpler to support for users who can use it (starting from Java 19 with --enable-preview). This might just work without issue, but I haven't tried it.

Scheduler.create(...)
  // ... 
  .executorService(Executors.newVirtualThreadPerTaskExecutor())
  .build();

If I get a chance to experiment with this I will report back

I'm also interested on knowing your results with virtual threads.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants