Skip to content

Commit db45bbd

Browse files
authored
fix(worker): refactor template fetch to reduce db calls (#9609)
1 parent 38e88dc commit db45bbd

File tree

7 files changed

+70
-38
lines changed

7 files changed

+70
-38
lines changed

apps/worker/src/app/workflow/usecases/add-job/add-job.usecase.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import {
2828
TierRestrictionsValidateUsecase,
2929
WorkflowRunStatusEnum,
3030
} from '@novu/application-generic';
31-
import { JobEntity, JobRepository, JobStatusEnum, SubscriberRepository } from '@novu/dal';
31+
import { JobEntity, JobRepository, JobStatusEnum, NotificationTemplateEntity, SubscriberRepository } from '@novu/dal';
3232
import { DelayOutput, DigestOutput, ExecuteOutput } from '@novu/framework/internal';
3333
import {
3434
castUnitToDigestUnitEnum,
@@ -515,13 +515,15 @@ export class AddJob {
515515

516516
private async fetchBridgeData(
517517
command: AddJobCommand,
518-
filterVariables: IFilterVariables
518+
filterVariables: IFilterVariables,
519+
workflow?: NotificationTemplateEntity
519520
): Promise<ExecuteOutput | null> {
520521
const response = await this.executeBridgeJob.execute(
521522
ExecuteBridgeJobCommand.create({
522523
identifier: command.job.identifier,
523524
...command,
524525
variables: filterVariables,
526+
workflow,
525527
})
526528
);
527529

apps/worker/src/app/workflow/usecases/execute-bridge-job/execute-bridge-job.command.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { EnvironmentWithUserCommand, IFilterVariables } from '@novu/application-generic';
22

3-
import { JobEntity } from '@novu/dal';
4-
import { IsDefined, IsString } from 'class-validator';
3+
import { JobEntity, NotificationTemplateEntity } from '@novu/dal';
4+
import { IsDefined, IsOptional, IsString } from 'class-validator';
55

66
export class ExecuteBridgeJobCommand extends EnvironmentWithUserCommand {
77
@IsDefined()
@@ -28,4 +28,7 @@ export class ExecuteBridgeJobCommand extends EnvironmentWithUserCommand {
2828

2929
@IsDefined()
3030
variables?: IFilterVariables;
31+
32+
@IsOptional()
33+
workflow?: NotificationTemplateEntity;
3134
}

apps/worker/src/app/workflow/usecases/execute-bridge-job/execute-bridge-job.usecase.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,23 @@ export class ExecuteBridgeJob {
6363

6464
let workflow: NotificationTemplateEntity | null = null;
6565
if (isStateful) {
66-
workflow = await this.notificationTemplateRepository.findOne(
67-
{
68-
_id: command.job._templateId,
69-
_environmentId: command.environmentId,
70-
type: {
71-
$in: [ResourceTypeEnum.ECHO, ResourceTypeEnum.BRIDGE],
66+
if (
67+
command.workflow &&
68+
(command.workflow.type === ResourceTypeEnum.ECHO || command.workflow.type === ResourceTypeEnum.BRIDGE)
69+
) {
70+
workflow = command.workflow;
71+
} else {
72+
workflow = await this.notificationTemplateRepository.findOne(
73+
{
74+
_id: command.job._templateId,
75+
_environmentId: command.environmentId,
76+
type: {
77+
$in: [ResourceTypeEnum.ECHO, ResourceTypeEnum.BRIDGE],
78+
},
7279
},
73-
},
74-
'_id triggers type origin'
75-
);
80+
'_id triggers type origin'
81+
);
82+
}
7683
}
7784

7885
if (!workflow && isStateful) {

apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ export class RunJob {
109109
});
110110

111111
// Update workflow run delivery lifecycle after job cancellation
112-
await this.conditionallyUpdateDeliveryLifecycle(job, WorkflowRunStatusEnum.COMPLETED);
112+
await this.conditionallyUpdateDeliveryLifecycle(job, WorkflowRunStatusEnum.COMPLETED, undefined);
113113

114114
return;
115115
}
@@ -152,6 +152,9 @@ export class RunJob {
152152
throw new PlatformException(`Notification with id ${job._notificationId} not found`);
153153
}
154154

155+
const workflow =
156+
(await this.notificationTemplateRepository.findById(job._templateId, job._environmentId)) ?? undefined;
157+
155158
if (isSubscribersScheduleEnabled) {
156159
const schedule = await this.getSubscriberSchedule.execute(
157160
GetSubscriberScheduleCommand.create({
@@ -177,7 +180,7 @@ export class RunJob {
177180

178181
if (
179182
isOutsideSubscriberSchedule &&
180-
(await this.shouldExtendToSubscriberSchedule(job, notification.critical ?? false))
183+
(await this.shouldExtendToSubscriberSchedule(job, notification.critical ?? false, workflow))
181184
) {
182185
this.logger.info(
183186
{
@@ -227,7 +230,7 @@ export class RunJob {
227230
);
228231

229232
// Update workflow run delivery lifecycle after schedule-based cancellation
230-
await this.conditionallyUpdateDeliveryLifecycle(job, WorkflowRunStatusEnum.COMPLETED);
233+
await this.conditionallyUpdateDeliveryLifecycle(job, WorkflowRunStatusEnum.COMPLETED, workflow);
231234

232235
return;
233236
}
@@ -271,6 +274,7 @@ export class RunJob {
271274
severity: notification.severity,
272275
statelessPreferences: job.preferences,
273276
contextKeys: job.contextKeys,
277+
workflow,
274278
})
275279
);
276280

@@ -285,7 +289,7 @@ export class RunJob {
285289
});
286290

287291
// Update workflow run delivery lifecycle after successful step completion
288-
await this.conditionallyUpdateDeliveryLifecycle(job, WorkflowRunStatusEnum.PROCESSING);
292+
await this.conditionallyUpdateDeliveryLifecycle(job, WorkflowRunStatusEnum.PROCESSING, workflow);
289293
} else if (sendMessageResult.status === 'failed') {
290294
await this.jobRepository.update(
291295
{
@@ -307,7 +311,7 @@ export class RunJob {
307311
});
308312

309313
// Update workflow run delivery lifecycle after step failure
310-
await this.conditionallyUpdateDeliveryLifecycle(job, WorkflowRunStatusEnum.COMPLETED);
314+
await this.conditionallyUpdateDeliveryLifecycle(job, WorkflowRunStatusEnum.COMPLETED, workflow);
311315

312316
if (shouldHaltOnStepFailure(job)) {
313317
shouldQueueNextJob = false;
@@ -330,7 +334,7 @@ export class RunJob {
330334
});
331335

332336
// Update workflow run delivery lifecycle after step skip/cancellation
333-
await this.conditionallyUpdateDeliveryLifecycle(job, WorkflowRunStatusEnum.PROCESSING);
337+
await this.conditionallyUpdateDeliveryLifecycle(job, WorkflowRunStatusEnum.PROCESSING, workflow);
334338
}
335339
} catch (caughtError: unknown) {
336340
error = caughtError as Error;
@@ -446,7 +450,7 @@ export class RunJob {
446450
);
447451

448452
// Update workflow run delivery lifecycle after step skip
449-
await this.conditionallyUpdateDeliveryLifecycle(nextJob, WorkflowRunStatusEnum.PROCESSING);
453+
await this.conditionallyUpdateDeliveryLifecycle(nextJob, WorkflowRunStatusEnum.PROCESSING, undefined);
450454

451455
currentJob = nextJob; // if skipped, continue to the next job
452456
} else {
@@ -654,7 +658,8 @@ export class RunJob {
654658
*/
655659
private async conditionallyUpdateDeliveryLifecycle(
656660
job: JobEntity,
657-
workflowStatus: WorkflowRunStatusEnum
661+
workflowStatus: WorkflowRunStatusEnum,
662+
workflow?: NotificationTemplateEntity
658663
): Promise<void> {
659664
this.logger.debug({ nv: { job } }, 'Conditionally updating delivery lifecycle');
660665

@@ -668,19 +673,21 @@ export class RunJob {
668673
return;
669674
}
670675

671-
const workflow: SelectedWorkflowFields | null = await this.notificationTemplateRepository.findOne(
672-
{
673-
_id: job._templateId,
674-
_environmentId: job._environmentId,
675-
},
676-
SELECTED_WORKFLOW_FIELDS_PROJECTION
677-
);
676+
const workflowWithSteps: SelectedWorkflowFields | null =
677+
workflow ??
678+
(await this.notificationTemplateRepository.findOne(
679+
{
680+
_id: job._templateId,
681+
_environmentId: job._environmentId,
682+
},
683+
SELECTED_WORKFLOW_FIELDS_PROJECTION
684+
));
678685

679-
if (!workflow || !workflow.steps) {
686+
if (!workflowWithSteps || !workflowWithSteps.steps) {
680687
return;
681688
}
682689

683-
const isLastStep = await this.isLastStepInWorkflow(job, workflow);
690+
const isLastStep = await this.isLastStepInWorkflow(job, workflowWithSteps);
684691
if (isLastStep) {
685692
this.logger.trace(
686693
{ nv: { jobId: job._id, stepId: job.step?._id } },
@@ -689,7 +696,7 @@ export class RunJob {
689696
return;
690697
}
691698

692-
const hasActionSteps = await this.hasRemainingActionSteps(job, workflow);
699+
const hasActionSteps = await this.hasRemainingActionSteps(job, workflowWithSteps);
693700

694701
if (hasActionSteps) {
695702
this.logger.trace(
@@ -724,7 +731,11 @@ export class RunJob {
724731
return false;
725732
}
726733

727-
private async shouldExtendToSubscriberSchedule(job: JobEntity, critical: boolean): Promise<boolean> {
734+
private async shouldExtendToSubscriberSchedule(
735+
job: JobEntity,
736+
critical: boolean,
737+
workflow?: NotificationTemplateEntity
738+
): Promise<boolean> {
728739
// should only extend to schedule for delay and digest when the workflow is not critical
729740
if ((job.type === StepTypeEnum.DELAY || job.type === StepTypeEnum.DIGEST) && !critical) {
730741
const bridgeResponse = await this.executeBridgeJob.execute(
@@ -736,6 +747,7 @@ export class RunJob {
736747
jobId: job._id,
737748
job: job,
738749
variables: {},
750+
workflow,
739751
})
740752
);
741753
const extendToSchedule = bridgeResponse?.outputs?.extendToSchedule as boolean | undefined;

apps/worker/src/app/workflow/usecases/send-message/send-message.command.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { EnvironmentWithUserCommand } from '@novu/application-generic';
2-
import type { JobEntity, NotificationStepEntity } from '@novu/dal';
2+
import type { JobEntity, NotificationStepEntity, NotificationTemplateEntity } from '@novu/dal';
33
import type { SeverityLevelEnum, TriggerOverrides, WorkflowPreferences } from '@novu/shared';
44
import { IsDefined, IsOptional, IsString } from 'class-validator';
55

@@ -53,4 +53,7 @@ export class SendMessageCommand extends EnvironmentWithUserCommand {
5353

5454
@IsOptional()
5555
contextKeys?: string[];
56+
57+
@IsOptional()
58+
workflow?: NotificationTemplateEntity;
5659
}

apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ export class SendMessage {
102102
bridgeResponse = await this.executeBridgeJob.execute({
103103
...command,
104104
variables,
105+
workflow: command.workflow,
105106
});
106107
}
107108
const isBridgeSkipped = bridgeResponse?.options?.skip;
@@ -329,10 +330,12 @@ export class SendMessage {
329330
return { result: true };
330331
}
331332

332-
const workflow = await this.getWorkflow({
333-
_id: job._templateId,
334-
environmentId: job._environmentId,
335-
});
333+
const workflow =
334+
command.workflow ??
335+
(await this.getWorkflow({
336+
_id: job._templateId,
337+
environmentId: job._environmentId,
338+
}));
336339

337340
const subscriber = await this.getSubscriberBySubscriberId({
338341
_environmentId: job._environmentId,

libs/application-generic/src/services/socket-worker/socket-worker.service.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,9 @@ export class SocketWorkerService {
131131
Authorization: `Bearer ${this.socketWorkerApiKey}`,
132132
},
133133
responseType: 'json',
134-
timeout: 5000, // 5 second timeout
134+
http2: true,
135+
dnsCache: true,
136+
timeout: 3000,
135137
retry: {
136138
limit: 2,
137139
methods: ['POST'],

0 commit comments

Comments
 (0)