diff --git a/queue/src/main/java/org/killbill/queue/retry/RetryableService.java b/queue/src/main/java/org/killbill/queue/retry/RetryableService.java index 501ca322..0c97da7b 100644 --- a/queue/src/main/java/org/killbill/queue/retry/RetryableService.java +++ b/queue/src/main/java/org/killbill/queue/retry/RetryableService.java @@ -138,6 +138,7 @@ public void scheduleRetry(final QueueRetryException exception, final DateTime effectiveDate = computeRetryDate(exception, originalEffectiveDate, retryNb); if (effectiveDate == null) { log.warn("Error processing event, NOT scheduling retry for event='{}', retryNb='{}'", originalNotificationEvent, retryNb, exception); + onRetriesExhausted(originalNotificationEvent, searchKey1, searchKey2); throw new RetryableInternalException(false); } log.warn("Error processing event, scheduling retry for event='{}', effectiveDate='{}', retryNb='{}'", originalNotificationEvent, effectiveDate, retryNb, exception); @@ -148,10 +149,20 @@ public void scheduleRetry(final QueueRetryException exception, throw new RetryableInternalException(true); } catch (final IOException e) { log.error("Unable to schedule retry for event='{}', effectiveDate='{}'", originalNotificationEvent, effectiveDate, e); + onRetriesExhausted(originalNotificationEvent, searchKey1, searchKey2); throw new RetryableInternalException(false); } } + /** + * Called when the retry schedule is exhausted for an event, or when a retry cannot be scheduled + * due to a serialization error. Subclasses can override to take remedial action (e.g. parking + * the account). searchKey1 = accountRecordId, searchKey2 = tenantRecordId. + */ + protected void onRetriesExhausted(final QueueEvent event, final Long searchKey1, final Long searchKey2) { + // no-op by default + } + private DateTime computeRetryDate(final QueueRetryException queueRetryException, final DateTime initialEventDateTime, final int retryNb) { final List retrySchedule = queueRetryException.getRetrySchedule(); if (retrySchedule == null || retryNb > retrySchedule.size()) { diff --git a/queue/src/test/java/org/killbill/queue/TestReaperIntegration.java b/queue/src/test/java/org/killbill/queue/TestReaperIntegration.java index 879bfa7b..cff647a1 100644 --- a/queue/src/test/java/org/killbill/queue/TestReaperIntegration.java +++ b/queue/src/test/java/org/killbill/queue/TestReaperIntegration.java @@ -339,7 +339,7 @@ public void processEvent(final BusEvent event) { } void waitFor(final BusEvent event) { - Awaitility.await().atMost(15, TimeUnit.SECONDS).until(new Callable() { + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(new Callable() { @Override public Boolean call() throws Exception { return receivedEvents.contains(event.getUserToken());