@@ -4,15 +4,15 @@ import TextJob from '../providers/text/TextJob'
44import EmailJob from '../providers/email/EmailJob'
55import { logger } from '../config/logger'
66import { User } from '../users/User'
7- import Campaign , { CampaignCreateParams , CampaignDelivery , CampaignParams , CampaignPopulationProgress , CampaignProgress , CampaignSend , CampaignSendReferenceType , CampaignSendState , CampaignState , SentCampaign } from './Campaign'
7+ import Campaign , { CampaignCreateParams , CampaignDelivery , CampaignParams , CampaignPopulationProgress , CampaignProgress , CampaignSend , CampaignSendParams , CampaignSendReferenceType , CampaignSendState , CampaignState , SentCampaign } from './Campaign'
88import List from '../lists/List'
99import Subscription , { SubscriptionState } from '../subscriptions/Subscription'
1010import { RequestError } from '../core/errors'
1111import { PageParams } from '../core/searchParams'
1212import { allLists } from '../lists/ListService'
1313import { allTemplates , duplicateTemplate , screenshotHtml , templateInUserLocale , validateTemplates } from '../render/TemplateService'
1414import { getSubscription , getUserSubscriptionState } from '../subscriptions/SubscriptionService'
15- import { chunk , cleanString , pick , shallowEqual } from '../utilities'
15+ import { batch , chunk , cleanString , pick , shallowEqual } from '../utilities'
1616import { getProvider } from '../providers/ProviderRepository'
1717import { createTagSubquery , getTags , setTags } from '../tags/TagService'
1818import { getProject } from '../projects/ProjectService'
@@ -362,6 +362,30 @@ export const populateSendList = async (campaign: SentCampaign) => {
362362 const progressCacheKey = CacheKeys . populationProgress ( campaign )
363363 const totalCacheKey = CacheKeys . populationTotal ( campaign )
364364
365+ const insertRows = async ( rows : CampaignSendParams [ ] ) => {
366+ try {
367+ await App . main . db . transaction ( async ( trx ) => {
368+ await CampaignSend . query ( trx )
369+ . insert ( rows )
370+ . onConflict ( [ 'campaign_id' , 'user_id' , 'reference_id' ] )
371+ . merge ( [ 'state' , 'send_at' ] )
372+ } )
373+ } catch ( error : any ) {
374+
375+ // If foreign key error, retry in smaller chunks
376+ if ( error . errno === 1452 && rows . length > 1 ) {
377+ const size = Math . max ( 1 , Math . floor ( rows . length / 2 ) )
378+ const batches = batch ( rows , size )
379+ for ( const items of batches ) {
380+ await insertRows ( items )
381+ }
382+ return
383+ }
384+
385+ logger . error ( { error, campaignId : campaign . id } , 'campaign:generate:progress:error' )
386+ }
387+ }
388+
365389 await processUsers ( {
366390 query : await recipientClickhouseQuery ( campaign ) ,
367391 cacheKey : CacheKeys . generate ( campaign ) ,
@@ -379,16 +403,7 @@ export const populateSendList = async (campaign: SentCampaign) => {
379403 } ,
380404 callback : async ( pairs : DataPair [ ] ) => {
381405 const items = pairs . map ( ( { key, value } ) => CampaignSend . create ( campaign , project , { id : parseInt ( key ) , timezone : value } ) )
382- try {
383- await App . main . db . transaction ( async ( trx ) => {
384- await CampaignSend . query ( trx )
385- . insert ( items )
386- . onConflict ( [ 'campaign_id' , 'user_id' , 'reference_id' ] )
387- . merge ( [ 'state' , 'send_at' ] )
388- } )
389- } catch ( error ) {
390- logger . error ( { error, campaignId : campaign . id } , 'campaign:generate:progress:error' )
391- }
406+ await insertRows ( items )
392407 await cacheIncr ( redis , progressCacheKey , items . length , oneDay )
393408 } ,
394409 afterCallback : async ( ) => {
0 commit comments