diff --git a/doc/admin-guide/files/records.yaml.en.rst b/doc/admin-guide/files/records.yaml.en.rst index 920ec5d7d40..4c8d12049d5 100644 --- a/doc/admin-guide/files/records.yaml.en.rst +++ b/doc/admin-guide/files/records.yaml.en.rst @@ -383,6 +383,19 @@ Thread Variables will create its own domain socket with a ``-`` suffix added to the end of the path. +.. ts:cv:: CONFIG proxy.config.exec_thread.loop_time_update_probability INT 10 + :reloadable: + + This dynamically loadable setting controls the rate that exec thread loop timestamps are + updated after processing an event given as a percentage from 0 to 100. 0 + would mean the timestamp is only updated once per event loop, 100 percent + means the timestamp is updated after any potential operation that could take + time (i.e. processing an event or waiting on IO). The timestamp is used for + queuing events and comparing timestamps for processing. Updating more often + might improve event timer accuracy and event loop metrics, but increases the + number of times that the current time is obtained from the OS. See also + `proxy.config.system_clock` + .. ts:cv:: CONFIG proxy.config.accept_threads INT 1 The number of accept threads. If disabled (``0``), then accepts will be done diff --git a/include/iocore/eventsystem/EThread.h b/include/iocore/eventsystem/EThread.h index 9b6f64bfed0..bb3f561d5c8 100644 --- a/include/iocore/eventsystem/EThread.h +++ b/include/iocore/eventsystem/EThread.h @@ -352,8 +352,8 @@ class EThread : public Thread void execute() override; void execute_regular(); - void process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count); - void process_event(Event *e, int calling_code); + ink_hrtime process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count, ink_hrtime event_time); + ink_hrtime process_event(Event *e, int calling_code, ink_hrtime event_time); void free_event(Event *e); LoopTailHandler *tail_cb = &DEFAULT_TAIL_HANDLER; diff --git a/src/iocore/eventsystem/EventSystem.cc b/src/iocore/eventsystem/EventSystem.cc index f91c6531895..73c10ecd421 100644 --- a/src/iocore/eventsystem/EventSystem.cc +++ b/src/iocore/eventsystem/EventSystem.cc @@ -44,6 +44,9 @@ ink_event_system_init(ts::ModuleVersion v) RecEstablishStaticConfigInt32(thread_freelist_low_watermark, "proxy.config.allocator.thread_freelist_low_watermark"); + extern int loop_time_update_probability; + RecEstablishStaticConfigInt32(loop_time_update_probability, "proxy.config.exec_thread.loop_time_update_probability"); + int chunk_sizes[DEFAULT_BUFFER_SIZES] = {0}; { auto chunk_sizes_string{RecGetRecordStringAlloc("proxy.config.allocator.iobuf_chunk_sizes")}; diff --git a/src/iocore/eventsystem/UnixEThread.cc b/src/iocore/eventsystem/UnixEThread.cc index cfcd889fbe7..209636fe15b 100644 --- a/src/iocore/eventsystem/UnixEThread.cc +++ b/src/iocore/eventsystem/UnixEThread.cc @@ -33,6 +33,7 @@ ///////////////////////////////////////////////////////////////////// #include "P_EventSystem.h" #include "iocore/eventsystem/Lock.h" +#include "tscore/ink_hrtime.h" #if HAVE_EVENTFD #include @@ -53,6 +54,7 @@ char const *const EThread::Metrics::Slice::STAT_NAME[] = { }; int thread_max_heartbeat_mseconds = THREAD_MAX_HEARTBEAT_MSECONDS; +int loop_time_update_probability = 10; // To define a class inherits from Thread: // 1) Define an independent thread_local static member @@ -144,19 +146,19 @@ EThread::set_event_type(EventType et) event_types |= (1 << static_cast(et)); } -void -EThread::process_event(Event *e, int calling_code) +ink_hrtime +EThread::process_event(Event *e, int calling_code, ink_hrtime event_time) { ink_assert((!e->in_the_prot_queue && !e->in_the_priority_queue)); WEAK_MUTEX_TRY_LOCK(lock, e->mutex, this); if (!lock.is_locked()) { - e->timeout_at = ink_get_hrtime() + DELAY_FOR_RETRY; + e->timeout_at = event_time + DELAY_FOR_RETRY; EventQueueExternal.enqueue_local(e); } else { if (e->cancelled) { MUTEX_RELEASE(lock); free_event(e); - return; + return event_time; } Continuation *c_temp = e->continuation; @@ -164,6 +166,13 @@ EThread::process_event(Event *e, int calling_code) set_cont_flags(e->continuation->control_flags); e->continuation->handleEvent(calling_code, e); + if (loop_time_update_probability == 100) { + event_time = ink_get_hrtime(); + } else if (loop_time_update_probability > 0) { + if (static_cast(generator.random() % 100) < loop_time_update_probability) { + event_time = ink_get_hrtime(); + } + } ink_assert(!e->in_the_priority_queue); ink_assert(c_temp == e->continuation); MUTEX_RELEASE(lock); @@ -172,7 +181,7 @@ EThread::process_event(Event *e, int calling_code) if (e->period < 0) { e->timeout_at = e->period; } else { - e->timeout_at = ink_get_hrtime() + e->period; + e->timeout_at = event_time + e->period; } EventQueueExternal.enqueue_local(e); } @@ -180,10 +189,11 @@ EThread::process_event(Event *e, int calling_code) free_event(e); } } + return event_time; } -void -EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count) +ink_hrtime +EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count, ink_hrtime event_time) { Event *e; @@ -198,9 +208,9 @@ EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_ free_event(e); } else if (!e->timeout_at) { // IMMEDIATE ink_assert(e->period == 0); - process_event(e, e->callback_event); + event_time = process_event(e, e->callback_event, event_time); } else if (e->timeout_at > 0) { // INTERVAL - EventQueue.enqueue(e, ink_get_hrtime()); + EventQueue.enqueue(e, event_time); } else { // NEGATIVE Event *p = nullptr; Event *a = NegativeQueue->head; @@ -216,6 +226,7 @@ EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_ } ++(*nq_count); } + return event_time; } void @@ -228,8 +239,10 @@ EThread::execute_regular() ink_hrtime loop_start_time; // Time the loop started. ink_hrtime loop_finish_time; // Time at the end of the loop. + loop_start_time = ink_get_hrtime(); + // Track this so we can update on boundary crossing. - auto prev_slice = this->metrics.prev_slice(metrics._slice.data() + (ink_get_hrtime() / HRTIME_SECOND) % Metrics::N_SLICES); + auto prev_slice = this->metrics.prev_slice(metrics._slice.data() + (loop_start_time / HRTIME_SECOND) % Metrics::N_SLICES); int nq_count; int ev_count; @@ -241,9 +254,8 @@ EThread::execute_regular() // give priority to immediate events while (!TSSystemState::is_event_system_shut_down()) { - loop_start_time = ink_get_hrtime(); - nq_count = 0; // count # of elements put on negative queue. - ev_count = 0; // # of events handled. + nq_count = 0; // count # of elements put on negative queue. + ev_count = 0; // # of events handled. current_slice = metrics._slice.data() + (loop_start_time / HRTIME_SECOND) % Metrics::N_SLICES; metrics.current_slice.store(current_slice, std::memory_order_release); @@ -256,37 +268,37 @@ EThread::execute_regular() } ++(current_slice->_count); // loop started, bump count. - process_queue(&NegativeQueue, &ev_count, &nq_count); + ink_hrtime event_time = process_queue(&NegativeQueue, &ev_count, &nq_count, loop_start_time); bool done_one; do { done_one = false; // execute all the eligible internal events - EventQueue.check_ready(loop_start_time, this); - while ((e = EventQueue.dequeue_ready(ink_get_hrtime()))) { + EventQueue.check_ready(event_time, this); + while ((e = EventQueue.dequeue_ready(event_time))) { ink_assert(e); ink_assert(e->timeout_at > 0); if (e->cancelled) { free_event(e); } else { - done_one = true; - process_event(e, e->callback_event); + done_one = true; + event_time = process_event(e, e->callback_event, event_time); } } } while (done_one); // execute any negative (poll) events if (NegativeQueue.head) { - process_queue(&NegativeQueue, &ev_count, &nq_count); + event_time = process_queue(&NegativeQueue, &ev_count, &nq_count, event_time); // execute poll events while ((e = NegativeQueue.dequeue())) { - process_event(e, EVENT_POLL); + event_time = process_event(e, EVENT_POLL, event_time); } } next_time = EventQueue.earliest_timeout(); - ink_hrtime sleep_time = next_time - ink_get_hrtime(); + ink_hrtime sleep_time = next_time - event_time; if (sleep_time > 0) { if (EventQueueExternal.localQueue.empty()) { sleep_time = std::min(sleep_time, HRTIME_MSECONDS(thread_max_heartbeat_mseconds)); @@ -301,7 +313,7 @@ EThread::execute_regular() } // drained the queue by this point - ink_hrtime post_drain = ink_get_hrtime(); + ink_hrtime post_drain = event_time; ink_hrtime drain_queue = post_drain - loop_start_time; tail_cb->waitForActivity(sleep_time); @@ -311,7 +323,8 @@ EThread::execute_regular() // @a delta can be negative due to time of day adjustments (which apparently happen quite frequently). I // tried using the monotonic clock to get around this but it was *very* stuttery (up to hundreds // of milliseconds), far too much to be actually used. - delta = std::max(0, loop_finish_time - loop_start_time); + delta = std::max(0, loop_finish_time - loop_start_time); + loop_start_time = loop_finish_time; metrics.decay(); metrics.record_loop_time(delta); diff --git a/src/records/RecordsConfig.cc b/src/records/RecordsConfig.cc index fbc2d3eb653..35ed7f47427 100644 --- a/src/records/RecordsConfig.cc +++ b/src/records/RecordsConfig.cc @@ -21,6 +21,7 @@ limitations under the License. */ +#include "records/RecDefs.h" #include "tscore/ink_config.h" #include "tscore/Filenames.h" #include "records/RecordsConfig.h" @@ -117,6 +118,8 @@ static constexpr RecordElement RecordsConfig[] = , {RECT_CONFIG, "proxy.config.exec_thread.listen", RECD_INT, "0", RECU_RESTART_TS, RR_NULL, RECC_INT, "[0-1]", RECA_READ_ONLY} , + {RECT_CONFIG, "proxy.config.exec_thread.loop_time_update_probability", RECD_INT, "10", RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-100]", RECA_NULL} + , {RECT_CONFIG, "proxy.config.accept_threads", RECD_INT, "1", RECU_RESTART_TS, RR_NULL, RECC_INT, "[0-" TS_STR(TS_MAX_NUMBER_EVENT_THREADS) "]", RECA_READ_ONLY} , {RECT_CONFIG, "proxy.config.task_threads", RECD_INT, "2", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1-" TS_STR(TS_MAX_NUMBER_EVENT_THREADS) "]", RECA_READ_ONLY}