Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,6 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,

static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<MessageSet>& currentSetOfInputs,
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
auto& proxy = registry.get<FairMQDeviceProxy>();

O2_SIGNPOST_ID_GENERATE(sid, forwarding);
O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Cleaning up slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s",
slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : "");
Expand All @@ -631,6 +629,13 @@ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot sl
DataProcessingHelpers::cleanForwardedMessages(span, consume);
}

auto& asyncQueue = registry.get<AsyncQueue>();
auto& decongestion = registry.get<DecongestionService>();
O2_SIGNPOST_ID_GENERATE(aid, async_queue);
O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
.user<DecongestionContext>({.ref = registry, .oldestTimeslice = oldestTimeslice}));

O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Cleaning done");
};

Expand Down Expand Up @@ -1763,12 +1768,6 @@ auto forwardOnInsertion(ServiceRegistryRef& ref, std::span<fair::mq::MessagePtr>
O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding to %{public}s %d", info.name.c_str(), fi);
info.policy->forward(parts, ChannelIndex{fi}, ref);
}
auto& asyncQueue = ref.get<AsyncQueue>();
auto& decongestion = ref.get<DecongestionService>();
O2_SIGNPOST_ID_GENERATE(aid, async_queue);
O2_SIGNPOST_EVENT_EMIT(async_queue, aid, "forwardInputs", "Queuing forwarding oldestPossible %zu", oldestTimeslice.timeslice.value);
AsyncQueueHelpers::post(asyncQueue, AsyncTask{.timeslice = oldestTimeslice.timeslice, .id = decongestion.oldestPossibleTimesliceTask, .debounce = -1, .callback = decongestionCallbackLate}
.user<DecongestionContext>({.ref = ref, .oldestTimeslice = oldestTimeslice}));
O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
};

Expand Down