Skip to content

Commit a0fe4fc

Browse files
authored
refactor(scripts): pass waiting-children as key in addParentJob (#3512)
1 parent 5451bcb commit a0fe4fc

File tree

9 files changed

+53
-62
lines changed

9 files changed

+53
-62
lines changed

python/bullmq/flow_producer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ async def addNode(self, node: dict, parent: dict, queues_opts: dict, pipe):
9191

9292
await self.scripts.addParentJob(
9393
job,
94-
wait_children_key,
9594
pipe
9695
)
9796

python/bullmq/scripts.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
3434
self.commands = {
3535
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-9.lua")),
3636
"addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-6.lua")),
37-
"addParentJob": self.redisClient.register_script(self.getScript("addParentJob-5.lua")),
37+
"addParentJob": self.redisClient.register_script(self.getScript("addParentJob-6.lua")),
3838
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-9.lua")),
3939
"changePriority": self.redisClient.register_script(self.getScript("changePriority-7.lua")),
4040
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-3.lua")),
@@ -86,7 +86,7 @@ def mapKey(key):
8686
return self.keys[key]
8787
return list(map(mapKey, keys))
8888

89-
def addJobArgs(self, job: Job, waiting_children_key: str|None):
89+
def addJobArgs(self, job: Job):
9090
# We are still lacking some arguments here:
9191
# ARGV[1] msgpacked arguments array
9292
# [9] repeat job key
@@ -99,7 +99,6 @@ def addJobArgs(self, job: Job, waiting_children_key: str|None):
9999

100100
packedArgs = msgpack.packb(
101101
[self.keys[""], job.id or "", job.name, job.timestamp, job.parentKey,
102-
waiting_children_key,
103102
f"{parentKey}:dependencies" if parentKey else None, parent],use_bin_type=True)
104103

105104
return [packedArgs, jsonData, packedOpts]
@@ -131,7 +130,7 @@ def addStandardJob(self, job: Job, timestamp: int, pipe = None):
131130
"""
132131
keys = self.getKeys(['wait', 'paused', 'meta', 'id', 'completed',
133132
'delayed', 'active', 'events', 'marker'])
134-
args = self.addJobArgs(job, None)
133+
args = self.addJobArgs(job)
135134
args.append(timestamp)
136135

137136
return self.commands["addStandardJob"](keys=keys, args=args, client=pipe)
@@ -142,7 +141,7 @@ def addDelayedJob(self, job: Job, timestamp: int, pipe = None):
142141
"""
143142
keys = self.getKeys(['marker', 'meta', 'id',
144143
'delayed', 'completed', 'events'])
145-
args = self.addJobArgs(job, None)
144+
args = self.addJobArgs(job)
146145
args.append(timestamp)
147146

148147
return self.commands["addDelayedJob"](keys=keys, args=args, client=pipe)
@@ -153,18 +152,18 @@ def addPrioritizedJob(self, job: Job, timestamp: int, pipe = None):
153152
"""
154153
keys = self.getKeys(['marker', 'meta', 'id', 'prioritized',
155154
'delayed', 'completed', 'active', 'events', 'pc'])
156-
args = self.addJobArgs(job, None)
155+
args = self.addJobArgs(job)
157156
args.append(timestamp)
158157

159158
return self.commands["addPrioritizedJob"](keys=keys, args=args, client=pipe)
160159

161-
def addParentJob(self, job: Job, waiting_children_key: str, pipe = None):
160+
def addParentJob(self, job: Job, pipe = None):
162161
"""
163162
Add a job to the queue that is a parent
164163
"""
165-
keys = self.getKeys(['meta', 'id', 'delayed', 'completed', 'events'])
166-
167-
args = self.addJobArgs(job, waiting_children_key)
164+
keys = self.getKeys(['meta', 'id', 'delayed', 'waiting-children', 'completed', 'events'])
165+
166+
args = self.addJobArgs(job)
168167

169168
return self.commands["addParentJob"](keys=keys, args=args, client=pipe)
170169

src/classes/flow-producer.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -371,14 +371,10 @@ export class FlowProducer extends EventEmitter {
371371
const queueKeysParent = new QueueKeys(
372372
node.prefix || this.opts.prefix,
373373
);
374-
const waitChildrenKey = queueKeysParent.toKey(
375-
node.queueName,
376-
'waiting-children',
377-
);
378374

379375
await job.addJob(<Redis>(multi as unknown), {
380376
parentDependenciesKey: parent?.parentDependenciesKey,
381-
waitChildrenKey,
377+
addToWaitingChildren: true,
382378
parentKey,
383379
});
384380

src/classes/scripts.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ export class Scripts {
169169
queueKeys.meta,
170170
queueKeys.id,
171171
queueKeys.delayed,
172+
queueKeys['waiting-children'],
172173
queueKeys.completed,
173174
queueKeys.events,
174175
];
@@ -240,7 +241,6 @@ export class Scripts {
240241
job.name,
241242
job.timestamp,
242243
job.parentKey || null,
243-
parentKeyOpts.waitChildrenKey || null,
244244
parentKeyOpts.parentDependenciesKey || null,
245245
parent,
246246
job.repeatJobKey,
@@ -270,7 +270,7 @@ export class Scripts {
270270

271271
let result: string | number;
272272

273-
if (parentKeyOpts.waitChildrenKey) {
273+
if (parentKeyOpts.addToWaitingChildren) {
274274
result = await this.addParentJob(client, job, encodedOpts, args);
275275
} else if (typeof opts.delay == 'number' && opts.delay > 0) {
276276
result = await this.addDelayedJob(client, job, encodedOpts, args);
@@ -746,8 +746,8 @@ export class Scripts {
746746
return typeof shouldRemove === 'object'
747747
? shouldRemove
748748
: typeof shouldRemove === 'number'
749-
? { count: shouldRemove }
750-
: { count: shouldRemove ? 0 : -1 };
749+
? { count: shouldRemove }
750+
: { count: shouldRemove ? 0 : -1 };
751751
}
752752

753753
async moveToFinished(

src/commands/addDelayedJob-6.lua

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@
2121
[3] name
2222
[4] timestamp
2323
[5] parentKey?
24-
x [6] waitChildrenKey key.
25-
[7] parent dependencies key.
26-
[8] parent? {id, queueKey}
27-
[9] repeat job key
28-
[10] deduplication key
24+
[6] parent dependencies key.
25+
[7] parent? {id, queueKey}
26+
[8] repeat job key
27+
[9] deduplication key
2928
3029
ARGV[2] Json stringified job data
3130
ARGV[3] msgpacked options
@@ -50,9 +49,9 @@ local args = cmsgpack.unpack(ARGV[1])
5049
local data = ARGV[2]
5150

5251
local parentKey = args[5]
53-
local parent = args[8]
54-
local repeatJobKey = args[9]
55-
local deduplicationKey = args[10]
52+
local parent = args[7]
53+
local repeatJobKey = args[8]
54+
local deduplicationKey = args[9]
5655
local parentData
5756

5857
-- Includes
@@ -73,7 +72,7 @@ local jobCounter = rcall("INCR", idKey)
7372
local maxEvents = getOrSetMaxEvents(metaKey)
7473
local opts = cmsgpack.unpack(ARGV[3])
7574

76-
local parentDependenciesKey = args[7]
75+
local parentDependenciesKey = args[6]
7776
local timestamp = args[4]
7877

7978
if args[2] == "" then

src/commands/addParentJob-5.lua renamed to src/commands/addParentJob-6.lua

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,20 @@
88
KEYS[1] 'meta'
99
KEYS[2] 'id'
1010
KEYS[3] 'delayed'
11-
KEYS[4] 'completed'
12-
KEYS[5] events stream key
11+
KEYS[4] 'waiting-children'
12+
KEYS[5] 'completed'
13+
KEYS[6] events stream key
1314
1415
ARGV[1] msgpacked arguments array
1516
[1] key prefix,
1617
[2] custom id (will not generate one automatically)
1718
[3] name
1819
[4] timestamp
1920
[5] parentKey?
20-
[6] waitChildrenKey key.
21-
[7] parent dependencies key.
22-
[8] parent? {id, queueKey}
23-
[9] repeat job key
24-
[10] deduplication key
21+
[6] parent dependencies key.
22+
[7] parent? {id, queueKey}
23+
[8] repeat job key
24+
[9] deduplication key
2525
2626
ARGV[2] Json stringified job data
2727
ARGV[3] msgpacked options
@@ -33,8 +33,8 @@
3333
local metaKey = KEYS[1]
3434
local idKey = KEYS[2]
3535

36-
local completedKey = KEYS[4]
37-
local eventsKey = KEYS[5]
36+
local completedKey = KEYS[5]
37+
local eventsKey = KEYS[6]
3838

3939
local jobId
4040
local jobIdKey
@@ -46,9 +46,9 @@ local data = ARGV[2]
4646
local opts = cmsgpack.unpack(ARGV[3])
4747

4848
local parentKey = args[5]
49-
local parent = args[8]
50-
local repeatJobKey = args[9]
51-
local deduplicationKey = args[10]
49+
local parent = args[7]
50+
local repeatJobKey = args[8]
51+
local deduplicationKey = args[9]
5252
local parentData
5353

5454
-- Includes
@@ -67,7 +67,7 @@ local jobCounter = rcall("INCR", idKey)
6767

6868
local maxEvents = getOrSetMaxEvents(metaKey)
6969

70-
local parentDependenciesKey = args[7]
70+
local parentDependenciesKey = args[6]
7171
local timestamp = args[4]
7272
if args[2] == "" then
7373
jobId = jobCounter
@@ -92,7 +92,7 @@ end
9292
storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
9393
parentKey, parentData, repeatJobKey)
9494

95-
local waitChildrenKey = args[6]
95+
local waitChildrenKey = KEYS[4]
9696
rcall("ZADD", waitChildrenKey, timestamp, jobId)
9797
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
9898
"waiting-children", "jobId", jobId)

src/commands/addPrioritizedJob-9.lua

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@
2121
[3] name
2222
[4] timestamp
2323
[5] parentKey?
24-
[6] waitChildrenKey key.
25-
[7] parent dependencies key.
26-
[8] parent? {id, queueKey}
27-
[9] repeat job key
28-
[10] deduplication key
24+
[6] parent dependencies key.
25+
[7] parent? {id, queueKey}
26+
[8] repeat job key
27+
[9] deduplication key
2928
3029
ARGV[2] Json stringified job data
3130
ARGV[3] msgpacked options
@@ -53,9 +52,9 @@ local data = ARGV[2]
5352
local opts = cmsgpack.unpack(ARGV[3])
5453

5554
local parentKey = args[5]
56-
local parent = args[8]
57-
local repeatJobKey = args[9]
58-
local deduplicationKey = args[10]
55+
local parent = args[7]
56+
local repeatJobKey = args[8]
57+
local deduplicationKey = args[9]
5958
local parentData
6059

6160
-- Includes
@@ -76,7 +75,7 @@ local jobCounter = rcall("INCR", idKey)
7675

7776
local maxEvents = getOrSetMaxEvents(metaKey)
7877

79-
local parentDependenciesKey = args[7]
78+
local parentDependenciesKey = args[6]
8079
local timestamp = args[4]
8180
if args[2] == "" then
8281
jobId = jobCounter

src/commands/addStandardJob-9.lua

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,10 @@
3131
[3] name
3232
[4] timestamp
3333
[5] parentKey?
34-
[6] waitChildrenKey key.
35-
[7] parent dependencies key.
36-
[8] parent? {id, queueKey}
37-
[9] repeat job key
38-
[10] deduplication key
34+
[6] parent dependencies key.
35+
[7] parent? {id, queueKey}
36+
[8] repeat job key
37+
[9] deduplication key
3938
4039
ARGV[2] Json stringified job data
4140
ARGV[3] msgpacked options
@@ -56,9 +55,9 @@ local data = ARGV[2]
5655
local opts = cmsgpack.unpack(ARGV[3])
5756

5857
local parentKey = args[5]
59-
local parent = args[8]
60-
local repeatJobKey = args[9]
61-
local deduplicationKey = args[10]
58+
local parent = args[7]
59+
local repeatJobKey = args[8]
60+
local deduplicationKey = args[9]
6261
local parentData
6362

6463
-- Includes
@@ -80,7 +79,7 @@ local jobCounter = rcall("INCR", KEYS[4])
8079
local metaKey = KEYS[3]
8180
local maxEvents = getOrSetMaxEvents(metaKey)
8281

83-
local parentDependenciesKey = args[7]
82+
local parentDependenciesKey = args[6]
8483
local timestamp = args[4]
8584
if args[2] == "" then
8685
jobId = jobCounter

src/interfaces/parent.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ export interface ParentKeys {
2121
}
2222

2323
export type ParentKeyOpts = {
24-
waitChildrenKey?: string;
24+
addToWaitingChildren?: boolean;
2525
parentDependenciesKey?: string;
2626
parentKey?: string;
2727
};

0 commit comments

Comments
 (0)