Skip to content

Commit f1573b3

Browse files
authored
fix(worker): do not retry processor when connection errors happen (#3482)
1 parent 0d9708e commit f1573b3

File tree

4 files changed

+61
-45
lines changed

4 files changed

+61
-45
lines changed

src/classes/worker.ts

Lines changed: 57 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -556,15 +556,11 @@ export class Worker<
556556
if (job) {
557557
const token = job.token;
558558
asyncFifoQueue.add(
559-
this.retryIfFailed<void | Job<DataType, ResultType, NameType>>(
560-
() =>
561-
this.processJob(
562-
<Job<DataType, ResultType, NameType>>job,
563-
token,
564-
() => asyncFifoQueue.numTotal() <= this._concurrency,
565-
jobsInProgress,
566-
),
567-
this.opts.runRetryDelay,
559+
this.processJob(
560+
<Job<DataType, ResultType, NameType>>job,
561+
token,
562+
() => asyncFifoQueue.numTotal() <= this._concurrency,
563+
jobsInProgress,
568564
),
569565
);
570566
} else if (asyncFifoQueue.numQueued() === 0) {
@@ -719,12 +715,9 @@ will never work with more accuracy than 1ms. */
719715
// We cannot trust that the blocking connection stays blocking forever
720716
// due to issues in Redis and IORedis, so we will reconnect if we
721717
// don't get a response in the expected time.
722-
timeout = setTimeout(
723-
async () => {
724-
bclient.disconnect(!this.closing);
725-
},
726-
blockTimeout * 1000 + 1000,
727-
);
718+
timeout = setTimeout(async () => {
719+
bclient.disconnect(!this.closing);
720+
}, blockTimeout * 1000 + 1000);
728721

729722
this.updateDelays(); // reset delays to avoid reusing same values in next iteration
730723

@@ -922,38 +915,62 @@ will never work with more accuracy than 1ms. */
922915
const unrecoverableErrorMessage =
923916
this.getUnrecoverableErrorMessage(job);
924917
if (unrecoverableErrorMessage) {
925-
const failed = await this.handleFailed(
926-
new UnrecoverableError(unrecoverableErrorMessage),
927-
job,
928-
token,
929-
fetchNextCallback,
930-
jobsInProgress,
931-
inProgressItem,
932-
span,
918+
const failed = await this.retryIfFailed<void | Job<
919+
DataType,
920+
ResultType,
921+
NameType
922+
>>(
923+
() =>
924+
this.handleFailed(
925+
new UnrecoverableError(unrecoverableErrorMessage),
926+
job,
927+
token,
928+
fetchNextCallback,
929+
jobsInProgress,
930+
inProgressItem,
931+
span,
932+
),
933+
this.opts.runRetryDelay,
933934
);
934935
return failed;
935936
}
936937
jobsInProgress.add(inProgressItem);
937938

938939
const result = await this.callProcessJob(job, token);
939-
return await this.handleCompleted(
940-
result,
941-
job,
942-
token,
943-
fetchNextCallback,
944-
jobsInProgress,
945-
inProgressItem,
946-
span,
940+
return await this.retryIfFailed<void | Job<
941+
DataType,
942+
ResultType,
943+
NameType
944+
>>(
945+
() =>
946+
this.handleCompleted(
947+
result,
948+
job,
949+
token,
950+
fetchNextCallback,
951+
jobsInProgress,
952+
inProgressItem,
953+
span,
954+
),
955+
this.opts.runRetryDelay,
947956
);
948957
} catch (err) {
949-
const failed = await this.handleFailed(
950-
<Error>err,
951-
job,
952-
token,
953-
fetchNextCallback,
954-
jobsInProgress,
955-
inProgressItem,
956-
span,
958+
const failed = await this.retryIfFailed<void | Job<
959+
DataType,
960+
ResultType,
961+
NameType
962+
>>(
963+
() =>
964+
this.handleFailed(
965+
<Error>err,
966+
job,
967+
token,
968+
fetchNextCallback,
969+
jobsInProgress,
970+
inProgressItem,
971+
span,
972+
),
973+
this.opts.runRetryDelay,
957974
);
958975
return failed;
959976
} finally {
@@ -1327,13 +1344,10 @@ will never work with more accuracy than 1ms. */
13271344
} catch (err) {
13281345
if (isNotConnectionError(err as Error)) {
13291346
this.emit('error', <Error>err);
1330-
if (delayInMs) {
1331-
await this.delay(delayInMs);
1332-
}
13331347

13341348
throw err;
13351349
} else {
1336-
if (delayInMs) {
1350+
if (delayInMs && !this.closing && !this.closed) {
13371351
await this.delay(delayInMs);
13381352
}
13391353

tests/test_job_scheduler.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2256,6 +2256,7 @@ describe('Job Scheduler', function () {
22562256
},
22572257
);
22582258
});
2259+
const delayStub = sinon.stub(worker!, 'delay').callsFake(async () => {});
22592260

22602261
await processing;
22612262

@@ -2317,6 +2318,7 @@ describe('Job Scheduler', function () {
23172318

23182319
const delayedCount3 = await queue.getDelayedCount();
23192320
expect(delayedCount3).to.be.equal(1);
2321+
delayStub.restore();
23202322
});
23212323
});
23222324

tests/test_obliterate.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ describe('Obliterate', function () {
373373
let first = true;
374374
const worker = new Worker(
375375
queue.name,
376-
async job => {
376+
async () => {
377377
if (first) {
378378
first = false;
379379
throw new Error('failed first');

tests/test_worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { expect } from 'chai';
22
import { default as IORedis } from 'ioredis';
3-
import { after, reject, times } from 'lodash';
3+
import { after, times } from 'lodash';
44
import { describe, beforeEach, it, before, after as afterAll } from 'mocha';
55
import * as sinon from 'sinon';
66
import { v4 } from 'uuid';

0 commit comments

Comments
 (0)