-
-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make the PromisePool reusable for multiple batch of tasks #59
Comments
What you can do is set a variable to keep track of whether there is a started pool or not: let poolStarted
const pendingItems = getInitialPendingItems()
const inFlightItems = []
function promiseProducer () {
const item = pendingItems.shift()
if (!item) {
poolStarted = false
return null
}
inFlightItems.push(item)
(async () {
let data
try {
data = await fetchItem(item)
} catch (err) {
// Re-queue item
pendingItems.push(item)
}
inFlightItems.splice(inFlightItems.indexOf(item), 1)
// This could also be elsewhere, in a setInterval or something
if (pendingItems.length > 0 && inFlightItems.length < 1 && !poolStarted) {
startPool()
}
})()
}
function startPool () {
poolStarted = true
const pool = new PromisePool(promiseProducer, 3)
pool.start()
}
startPool()
// Later...
pendingItems.push({id: 5}) |
Hi @karlhorky , thanks for your suggestion and the code example. Let's consider the following scenario:
Your code will call the startPool() as the "poolStarted" has already been changed to "false" in step 3. In other words, a new pool will be created and run all the new tasks. Therefore, we will have 16 tasks concurrently running at this moment (8 tasks are still running in the first pool and the 8 new tasks will run in another pool), which exceeds our expectation (run up to 10 tasks concurrently). What I suggested is to re-use the first pool and make it call promiseProducer() again, so it can get another 2 tasks from the "pendingItems" and make the concurrent running tasks to 10. I hope my explanation is clear. Cheers. |
Right, sorry, I didn't address this requirement of limiting the concurrency. I've added |
Hi @karlhorky, please correct me if I am wrong. My understanding is the PromisePool will stop calling You can try your code here: https://repl.it/@tsw_tq/reusePromisePool I also included my proposal at the end of the code, you can change the As I mentioned, my suggestion is to make the PromisePool re-activable and start to call |
Yep, it was more of a suggestion of an idea than 100% running code 😊 Like I mentioned in the comment, the code that restarts the promise pool could be elsewhere like in a This solution aside, I think that your suggestion is a good one and I support having it in the library itself, as in vilic/promise-pool |
Hey guys, I really appreciate all the feedback here and I apologize for not responding to it sooner. Busy times, I'm sure you know how it goes. So anyway. As I understand it, this issue originally tried to deal with lists of promises of indefinite size. That is,
At this point, definitely correct me if I'm wrong. I'm going to try to offer a solution to both these challenges. The first one is inherently solved by the current API, as it doesn't require you to pass an array of promises the way The second challenge is a bit trickier, and I believe that's where the "reuse" comes in. It arises when there's a period of idle time between runs. It could be solved by indeed allowing I'm hesitant to add this feature to the pool itself for various reasons:
So what if instead of reusing the pool, you could keep it alive? That is, what if you could prevent it from entering the "done" state in that period where you don't have any new tasks yet? Here's a little demo that accomplishes that: const PromisePool = require('es6-promise-pool')
const defer = require('p-defer')
const delay = require('delay')
const concurrency = 3
const jobDuration = 1000
const initialJobCount = 5
const finalJobCount = 5
const startTime = Date.now()
const log = msg => console.log(`[+${Date.now() - startTime}] ${msg}`)
let pool
let pendingJobCount = initialJobCount
const lockDeferred = defer()
const lock = {
active: true,
promise: lockDeferred.promise,
release: () => {
lock.active = false
lockDeferred.resolve()
}
}
setTimeout(() => {
pendingJobCount += finalJobCount
log('unlock')
lock.release()
pool.concurrency(concurrency)
}, jobDuration * 3)
const producer = () => {
if (pendingJobCount > 0) {
log('next')
--pendingJobCount
return delay(jobDuration).then(() => log('resolve'))
} else if (lock.active) {
log('lock')
pool.concurrency(1)
return lock.promise
} else {
log('flush')
return null
}
}
pool = new PromisePool(producer, concurrency)
pool.start()
.then(() => {
log('done')
}) Essentially, once the last known promise (for the time being) has been produced, it returns a placeholder promise (using p-defer), which acts as a semaphore. That one won't resolve until all the promises that will ever exist have been created--i.e., until all the work that will ever come up has been scheduled into the pool. That last part is simulated using I feel like this concept is pretty extensible. You could even wrap it in a library if you want. Alternatively, I do also think there's a way you could create a new pool every time. You would just have to keep track of pending promises yourself as well. Alternatively, maybe there's a way you can create a pool of pools; I'd have to think about that one some more. Does that help at all? |
Hi Tim,
I am not sure whether we could make the PromisePool object reusable, which means I can call the start() multiple times, even if the pool has finished the tasks.
My scenario is something as follows:
The tasks (promises) are gradually generated and stored in a list. I want to start the tasks once the first batch of tasks is ready, so the start() will be called before all the tasks are generated. If I got some new tasks later, I can add them to the list (or pool) and the PromisePool can just keep working. The concurrency number should always be limited there.
The current problem is, if the task generator reached the end of the task list, the PromisePool will permanently stop. Even if I add new tasks and call the start() again. It is stopped by the _done variable. If we can reset the _done variable to "false" when the start() is called, the PromisePool should be able to keep working on the new tasks.
I cannot create a new PromisePool for the new tasks either, as the previous pool may still be working. If a new pool is created, the number of concurrently running tasks may exceed the limitation, as each pool can reach the max.
The text was updated successfully, but these errors were encountered: