diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 3eaab36fb7908..4954544deab84 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -619,8 +619,6 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector& currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) { - auto& proxy = registry.get(); - O2_SIGNPOST_ID_GENERATE(sid, forwarding); O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Cleaning up slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s", slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : ""); @@ -631,6 +629,13 @@ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot sl DataProcessingHelpers::cleanForwardedMessages(span, consume); } + auto& asyncQueue = registry.get(); + auto& decongestion = registry.get(); + O2_SIGNPOST_ID_GENERATE(aid, async_queue); + O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value); + AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate} + .user({.ref = registry, .oldestTimeslice = oldestTimeslice})); + O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Cleaning done"); }; @@ -1763,12 +1768,6 @@ auto forwardOnInsertion(ServiceRegistryRef& ref, std::span O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi); info.policy->forward(parts, ChannelIndex{fi}, ref); } - auto& asyncQueue = ref.get(); - auto& decongestion = ref.get(); - O2_SIGNPOST_ID_GENERATE(aid, async_queue); - O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value); - AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate} - .user({.ref = ref, .oldestTimeslice = oldestTimeslice})); O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done"); };