Skip to content

Commit

Permalink
Call LREM with count=1 on the active queue item
Browse files Browse the repository at this point in the history
For a high number of active tasks, the LREM operation becomes the main bottleneck.

Issue: #389
  • Loading branch information
pior committed Oct 14, 2024
1 parent d04888e commit a181ca9
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationT
// ARGV[2] -> stats expiration timestamp
// ARGV[3] -> max int64 value
var doneCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
if redis.call("LREM", KEYS[1], 1, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
Expand Down Expand Up @@ -308,7 +308,7 @@ return redis.status_reply("OK")
// ARGV[2] -> stats expiration timestamp
// ARGV[3] -> max int64 value
var doneUniqueCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
if redis.call("LREM", KEYS[1], 1, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
Expand Down Expand Up @@ -372,7 +372,7 @@ func (r *RDB) Done(ctx context.Context, msg *base.TaskMessage) error {
// ARGV[4] -> task message data
// ARGV[5] -> max int64 value
var markAsCompleteCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
if redis.call("LREM", KEYS[1], 1, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
Expand Down Expand Up @@ -409,7 +409,7 @@ return redis.status_reply("OK")
// ARGV[4] -> task message data
// ARGV[5] -> max int64 value
var markAsCompleteUniqueCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
if redis.call("LREM", KEYS[1], 1, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
Expand Down Expand Up @@ -476,7 +476,7 @@ func (r *RDB) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error {
// ARGV[1] -> task ID
// Note: Use RPUSH to push to the head of the queue.
var requeueCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
if redis.call("LREM", KEYS[1], 1, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
Expand Down Expand Up @@ -750,7 +750,7 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process
// ARGV[5] -> is_failure (bool)
// ARGV[6] -> max int64 value
var retryCmd = redis.NewScript(`
if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then
if redis.call("LREM", KEYS[2], 1, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then
Expand Down Expand Up @@ -838,7 +838,7 @@ const (
// ARGV[6] -> stats expiration timestamp
// ARGV[7] -> max int64 value
var archiveCmd = redis.NewScript(`
if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then
if redis.call("LREM", KEYS[2], 1, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then
Expand Down

0 comments on commit a181ca9

Please sign in to comment.