Skip to content
Merged
11 changes: 11 additions & 0 deletions queue/src/main/java/org/killbill/queue/retry/RetryableService.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public void scheduleRetry(final QueueRetryException exception,
final DateTime effectiveDate = computeRetryDate(exception, originalEffectiveDate, retryNb);
if (effectiveDate == null) {
log.warn("Error processing event, NOT scheduling retry for event='{}', retryNb='{}'", originalNotificationEvent, retryNb, exception);
onRetriesExhausted(originalNotificationEvent, searchKey1, searchKey2);
throw new RetryableInternalException(false);
}
log.warn("Error processing event, scheduling retry for event='{}', effectiveDate='{}', retryNb='{}'", originalNotificationEvent, effectiveDate, retryNb, exception);
Expand All @@ -148,10 +149,20 @@ public void scheduleRetry(final QueueRetryException exception,
throw new RetryableInternalException(true);
} catch (final IOException e) {
log.error("Unable to schedule retry for event='{}', effectiveDate='{}'", originalNotificationEvent, effectiveDate, e);
onRetriesExhausted(originalNotificationEvent, searchKey1, searchKey2);
throw new RetryableInternalException(false);
}
}

/**
* Called when the retry schedule is exhausted for an event, or when a retry cannot be scheduled
* due to a serialization error. Subclasses can override to take remedial action (e.g. parking
* the account). searchKey1 = accountRecordId, searchKey2 = tenantRecordId.
*/
protected void onRetriesExhausted(final QueueEvent event, final Long searchKey1, final Long searchKey2) {
// no-op by default
}

private DateTime computeRetryDate(final QueueRetryException queueRetryException, final DateTime initialEventDateTime, final int retryNb) {
final List<Period> retrySchedule = queueRetryException.getRetrySchedule();
if (retrySchedule == null || retryNb > retrySchedule.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public void processEvent(final BusEvent event) {
}

void waitFor(final BusEvent event) {
Awaitility.await().atMost(15, TimeUnit.SECONDS).until(new Callable<Boolean>() {
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return receivedEvents.contains(event.getUserToken());
Expand Down
Loading