Skip to content

Commit bef57a0

Browse files
authored
feat(queue): support global rate limit (#3468) ref #3019
1 parent a63cfcd commit bef57a0

File tree

13 files changed

+127
-25
lines changed

13 files changed

+127
-25
lines changed

docs/gitbook/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* [Auto-removal of jobs](guide/queues/auto-removal-of-jobs.md)
1818
* [Adding jobs in bulk](guide/queues/adding-bulks.md)
1919
* [Global Concurrency](guide/queues/global-concurrency.md)
20+
* [Global Rate Limit](guide/queues/global-rate-limit.md)
2021
* [Removing Jobs](guide/queues/removing-jobs.md)
2122
* [Workers](guide/workers/README.md)
2223
* [Auto-removal of jobs](guide/workers/auto-removal-of-jobs.md)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Global Rate Limit
2+
3+
The global rate limit config is a queue option that determines how many jobs are allowed to be processed in a specific period of time.
4+
5+
```typescript
6+
import { Queue } from 'bullmq';
7+
8+
// 1 job per second
9+
await queue.setGlobalRateLimit(1, 1000);
10+
```
11+
12+
And in order to get this value:
13+
14+
```typescript
15+
const globalConcurrency = await queue.getRateLimitTtl();
16+
```
17+
18+
{% hint style="info" %}
19+
Note that if you choose a rate limit level in your workers, it won't override the global one.
20+
{% endhint %}
21+
22+
## Read more:
23+
24+
- 💡 [Set Global Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#setglobalratelimit)
25+
- 💡 [Get Rate Limit Ttl API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#getratelimitttl)

src/classes/queue.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,16 @@ export class Queue<
291291
return client.hset(this.keys.meta, 'concurrency', concurrency);
292292
}
293293

294+
/**
295+
* Enable and set rate limit.
296+
* @param max - Max number of jobs to process in the time period specified in `duration`
297+
* @param duration - Time in milliseconds. During this time, a maximum of `max` jobs will be processed.
298+
*/
299+
async setGlobalRateLimit(max: number, duration: number) {
300+
const client = await this.client;
301+
return client.hset(this.keys.meta, 'max', max, 'duration', duration);
302+
}
303+
294304
/**
295305
* Remove global concurrency value.
296306
*/

src/classes/scripts.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1223,7 +1223,10 @@ export class Scripts {
12231223
}
12241224

12251225
getRateLimitTtlArgs(maxJobs?: number): (string | number)[] {
1226-
const keys: (string | number)[] = [this.queue.keys.limiter];
1226+
const keys: (string | number)[] = [
1227+
this.queue.keys.limiter,
1228+
this.queue.keys.meta,
1229+
];
12271230

12281231
return keys.concat([maxJobs ?? '0']);
12291232
}

src/commands/changePriority-7.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ if rcall("EXISTS", jobKey) == 1 then
5050
local prioritizedKey = KEYS[4]
5151
local priorityCounterKey = KEYS[6]
5252
local markerKey = KEYS[7]
53-
53+
5454
-- Re-add with the new priority
5555
if rcall("ZREM", prioritizedKey, jobId) > 0 then
5656
reAddJobWithNewPriority( prioritizedKey, markerKey, target,

src/commands/getRateLimitTtl-1.lua renamed to src/commands/getRateLimitTtl-2.lua

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
44
Input:
55
KEYS[1] 'limiter'
6+
KEYS[2] 'meta'
67
78
ARGV[1] maxJobs
89
]]
@@ -16,5 +17,10 @@ local rateLimiterKey = KEYS[1]
1617
if ARGV[1] ~= "0" then
1718
return getRateLimitTTL(tonumber(ARGV[1]), rateLimiterKey)
1819
else
20+
local rateLimitMax = rcall("HGET", KEYS[2], "max")
21+
if rateLimitMax then
22+
return getRateLimitTTL(tonumber(rateLimitMax), rateLimiterKey)
23+
end
24+
1925
return rcall("PTTL", rateLimiterKey)
2026
end

src/commands/includes/getTargetQueueList.lua

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@
44
]]
55

66
local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
7-
local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
7+
local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
88

99
if queueAttributes[1] then
10-
return pausedKey, true
10+
return pausedKey, true, queueAttributes[3], queueAttributes[4]
1111
else
1212
if queueAttributes[2] then
1313
local activeCount = rcall("LLEN", activeKey)
1414
if activeCount >= tonumber(queueAttributes[2]) then
15-
return waitKey, true
15+
return waitKey, true, queueAttributes[3], queueAttributes[4]
1616
else
17-
return waitKey, false
17+
return waitKey, false, queueAttributes[3], queueAttributes[4]
1818
end
1919
end
2020
end
21-
return waitKey, false
21+
return waitKey, false, queueAttributes[3], queueAttributes[4]
2222
end

src/commands/includes/prepareJobForProcessing.lua

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,22 @@
1010
--- @include "addBaseMarkerIfNeeded"
1111

1212
local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,
13-
jobId, processedOn, maxJobs, markerKey, opts)
13+
jobId, processedOn, maxJobs, limiterDuration, markerKey, opts)
1414
local jobKey = keyPrefix .. jobId
1515

1616
-- Check if we need to perform rate limiting.
1717
if maxJobs then
1818
local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
1919

2020
if jobCounter == 1 then
21-
local limiterDuration = opts['limiter'] and opts['limiter']['duration']
2221
local integerDuration = math.floor(math.abs(limiterDuration))
2322
rcall("PEXPIRE", rateLimiterKey, integerDuration)
2423
end
2524
end
2625

27-
local lockKey = jobKey .. ':lock'
28-
2926
-- get a lock
3027
if opts['token'] ~= "0" then
28+
local lockKey = jobKey .. ':lock'
3129
rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
3230
end
3331

src/commands/moveStalledJobsToWait-8.lua

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ local rcall = redis.call
2424
-- Includes
2525
--- @include "includes/addJobInTargetList"
2626
--- @include "includes/batches"
27-
--- @include "includes/getTargetQueueList"
2827
--- @include "includes/moveJobToWait"
2928
--- @include "includes/trimEvents"
3029

src/commands/moveToActive-11.lua

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,15 @@ local opts = cmsgpack.unpack(ARGV[3])
5151
--- @include "includes/prepareJobForProcessing"
5252
--- @include "includes/promoteDelayedJobs"
5353

54-
local target, isPausedOrMaxed = getTargetQueueList(KEYS[9], activeKey, waitKey, KEYS[8])
54+
local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration = getTargetQueueList(KEYS[9],
55+
activeKey, waitKey, KEYS[8])
5556

5657
-- Check if there are delayed jobs that we can move to wait.
5758
local markerKey = KEYS[11]
5859
promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1],
5960
ARGV[2], KEYS[10], isPausedOrMaxed)
6061

61-
local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])
62+
local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max']))
6263
local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)
6364

6465
-- Check if we are rate limited first.
@@ -67,6 +68,8 @@ if expireTime > 0 then return {0, 0, expireTime, 0} end
6768
-- paused or maxed queue
6869
if isPausedOrMaxed then return {0, 0, 0, 0} end
6970

71+
local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration
72+
7073
-- no job ID, try non-blocking move from wait to active
7174
local jobId = rcall("RPOPLPUSH", waitKey, activeKey)
7275

@@ -78,12 +81,12 @@ end
7881

7982
if jobId then
8083
return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
81-
maxJobs, markerKey, opts)
84+
maxJobs, limiterDuration, markerKey, opts)
8285
else
8386
jobId = moveJobFromPrioritizedToActive(KEYS[3], activeKey, KEYS[10])
8487
if jobId then
8588
return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
86-
maxJobs, markerKey, opts)
89+
maxJobs, limiterDuration, markerKey, opts)
8790
end
8891
end
8992

0 commit comments

Comments
 (0)