Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions doc/admin-guide/files/records.yaml.en.rst
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,19 @@ Thread Variables
will create its own domain socket with a ``-<thread id>`` suffix added to the
end of the path.

.. ts:cv:: CONFIG proxy.config.exec_thread.loop_time_update_probability INT 10
:reloadable:

This dynamically loadable setting controls the rate that exec thread loop timestamps are
updated after processing an event given as a percentage from 0 to 100. 0
would mean the timestamp is only updated once per event loop, 100 percent
means the timestamp is updated after any potential operation that could take
time (i.e. processing an event or waiting on IO). The timestamp is used for
queuing events and comparing timestamps for processing. Updating more often
might improve event timer accuracy and event loop metrics, but increases the
number of times that the current time is obtained from the OS. See also
`proxy.config.system_clock`

.. ts:cv:: CONFIG proxy.config.accept_threads INT 1

The number of accept threads. If disabled (``0``), then accepts will be done
Expand Down
4 changes: 2 additions & 2 deletions include/iocore/eventsystem/EThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ class EThread : public Thread

void execute() override;
void execute_regular();
void process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count);
void process_event(Event *e, int calling_code);
ink_hrtime process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count, ink_hrtime event_time);
ink_hrtime process_event(Event *e, int calling_code, ink_hrtime event_time);
void free_event(Event *e);
LoopTailHandler *tail_cb = &DEFAULT_TAIL_HANDLER;

Expand Down
3 changes: 3 additions & 0 deletions src/iocore/eventsystem/EventSystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ ink_event_system_init(ts::ModuleVersion v)

RecEstablishStaticConfigInt32(thread_freelist_low_watermark, "proxy.config.allocator.thread_freelist_low_watermark");

extern int loop_time_update_probability;
RecEstablishStaticConfigInt32(loop_time_update_probability, "proxy.config.exec_thread.loop_time_update_probability");

int chunk_sizes[DEFAULT_BUFFER_SIZES] = {0};
{
auto chunk_sizes_string{RecGetRecordStringAlloc("proxy.config.allocator.iobuf_chunk_sizes")};
Expand Down
59 changes: 36 additions & 23 deletions src/iocore/eventsystem/UnixEThread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
/////////////////////////////////////////////////////////////////////
#include "P_EventSystem.h"
#include "iocore/eventsystem/Lock.h"
#include "tscore/ink_hrtime.h"

#if HAVE_EVENTFD
#include <sys/eventfd.h>
Expand All @@ -53,6 +54,7 @@ char const *const EThread::Metrics::Slice::STAT_NAME[] = {
};

int thread_max_heartbeat_mseconds = THREAD_MAX_HEARTBEAT_MSECONDS;
int loop_time_update_probability = 10;

// To define a class inherits from Thread:
// 1) Define an independent thread_local static member
Expand Down Expand Up @@ -144,26 +146,33 @@ EThread::set_event_type(EventType et)
event_types |= (1 << static_cast<int>(et));
}

void
EThread::process_event(Event *e, int calling_code)
ink_hrtime
EThread::process_event(Event *e, int calling_code, ink_hrtime event_time)
{
ink_assert((!e->in_the_prot_queue && !e->in_the_priority_queue));
WEAK_MUTEX_TRY_LOCK(lock, e->mutex, this);
if (!lock.is_locked()) {
e->timeout_at = ink_get_hrtime() + DELAY_FOR_RETRY;
e->timeout_at = event_time + DELAY_FOR_RETRY;
EventQueueExternal.enqueue_local(e);
} else {
if (e->cancelled) {
MUTEX_RELEASE(lock);
free_event(e);
return;
return event_time;
}
Continuation *c_temp = e->continuation;

// Restore the client IP debugging flags
set_cont_flags(e->continuation->control_flags);

e->continuation->handleEvent(calling_code, e);
if (loop_time_update_probability == 100) {
event_time = ink_get_hrtime();
} else if (loop_time_update_probability > 0) {
if (static_cast<int>(generator.random() % 100) < loop_time_update_probability) {
event_time = ink_get_hrtime();
}
}
ink_assert(!e->in_the_priority_queue);
ink_assert(c_temp == e->continuation);
MUTEX_RELEASE(lock);
Expand All @@ -172,18 +181,19 @@ EThread::process_event(Event *e, int calling_code)
if (e->period < 0) {
e->timeout_at = e->period;
} else {
e->timeout_at = ink_get_hrtime() + e->period;
e->timeout_at = event_time + e->period;
}
EventQueueExternal.enqueue_local(e);
}
} else if (!e->in_the_prot_queue && !e->in_the_priority_queue) {
free_event(e);
}
}
return event_time;
}

void
EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count)
ink_hrtime
EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count, ink_hrtime event_time)
{
Event *e;

Expand All @@ -198,9 +208,9 @@ EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_
free_event(e);
} else if (!e->timeout_at) { // IMMEDIATE
ink_assert(e->period == 0);
process_event(e, e->callback_event);
event_time = process_event(e, e->callback_event, event_time);
} else if (e->timeout_at > 0) { // INTERVAL
EventQueue.enqueue(e, ink_get_hrtime());
EventQueue.enqueue(e, event_time);
} else { // NEGATIVE
Event *p = nullptr;
Event *a = NegativeQueue->head;
Expand All @@ -216,6 +226,7 @@ EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_
}
++(*nq_count);
}
return event_time;
}

void
Expand All @@ -228,8 +239,10 @@ EThread::execute_regular()
ink_hrtime loop_start_time; // Time the loop started.
ink_hrtime loop_finish_time; // Time at the end of the loop.

loop_start_time = ink_get_hrtime();

// Track this so we can update on boundary crossing.
auto prev_slice = this->metrics.prev_slice(metrics._slice.data() + (ink_get_hrtime() / HRTIME_SECOND) % Metrics::N_SLICES);
auto prev_slice = this->metrics.prev_slice(metrics._slice.data() + (loop_start_time / HRTIME_SECOND) % Metrics::N_SLICES);

int nq_count;
int ev_count;
Expand All @@ -241,9 +254,8 @@ EThread::execute_regular()

// give priority to immediate events
while (!TSSystemState::is_event_system_shut_down()) {
loop_start_time = ink_get_hrtime();
nq_count = 0; // count # of elements put on negative queue.
ev_count = 0; // # of events handled.
nq_count = 0; // count # of elements put on negative queue.
ev_count = 0; // # of events handled.

current_slice = metrics._slice.data() + (loop_start_time / HRTIME_SECOND) % Metrics::N_SLICES;
metrics.current_slice.store(current_slice, std::memory_order_release);
Expand All @@ -256,37 +268,37 @@ EThread::execute_regular()
}
++(current_slice->_count); // loop started, bump count.

process_queue(&NegativeQueue, &ev_count, &nq_count);
ink_hrtime event_time = process_queue(&NegativeQueue, &ev_count, &nq_count, loop_start_time);

bool done_one;
do {
done_one = false;
// execute all the eligible internal events
EventQueue.check_ready(loop_start_time, this);
while ((e = EventQueue.dequeue_ready(ink_get_hrtime()))) {
EventQueue.check_ready(event_time, this);
while ((e = EventQueue.dequeue_ready(event_time))) {
ink_assert(e);
ink_assert(e->timeout_at > 0);
if (e->cancelled) {
free_event(e);
} else {
done_one = true;
process_event(e, e->callback_event);
done_one = true;
event_time = process_event(e, e->callback_event, event_time);
}
}
} while (done_one);

// execute any negative (poll) events
if (NegativeQueue.head) {
process_queue(&NegativeQueue, &ev_count, &nq_count);
event_time = process_queue(&NegativeQueue, &ev_count, &nq_count, event_time);

// execute poll events
while ((e = NegativeQueue.dequeue())) {
process_event(e, EVENT_POLL);
event_time = process_event(e, EVENT_POLL, event_time);
}
}

next_time = EventQueue.earliest_timeout();
ink_hrtime sleep_time = next_time - ink_get_hrtime();
ink_hrtime sleep_time = next_time - event_time;
if (sleep_time > 0) {
if (EventQueueExternal.localQueue.empty()) {
sleep_time = std::min(sleep_time, HRTIME_MSECONDS(thread_max_heartbeat_mseconds));
Expand All @@ -301,7 +313,7 @@ EThread::execute_regular()
}

// drained the queue by this point
ink_hrtime post_drain = ink_get_hrtime();
ink_hrtime post_drain = event_time;
ink_hrtime drain_queue = post_drain - loop_start_time;

tail_cb->waitForActivity(sleep_time);
Expand All @@ -311,7 +323,8 @@ EThread::execute_regular()
// @a delta can be negative due to time of day adjustments (which apparently happen quite frequently). I
// tried using the monotonic clock to get around this but it was *very* stuttery (up to hundreds
// of milliseconds), far too much to be actually used.
delta = std::max<ink_hrtime>(0, loop_finish_time - loop_start_time);
delta = std::max<ink_hrtime>(0, loop_finish_time - loop_start_time);
loop_start_time = loop_finish_time;

metrics.decay();
metrics.record_loop_time(delta);
Expand Down
3 changes: 3 additions & 0 deletions src/records/RecordsConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
limitations under the License.
*/

#include "records/RecDefs.h"
#include "tscore/ink_config.h"
#include "tscore/Filenames.h"
#include "records/RecordsConfig.h"
Expand Down Expand Up @@ -117,6 +118,8 @@ static constexpr RecordElement RecordsConfig[] =
,
{RECT_CONFIG, "proxy.config.exec_thread.listen", RECD_INT, "0", RECU_RESTART_TS, RR_NULL, RECC_INT, "[0-1]", RECA_READ_ONLY}
,
{RECT_CONFIG, "proxy.config.exec_thread.loop_time_update_probability", RECD_INT, "10", RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-100]", RECA_NULL}
,
{RECT_CONFIG, "proxy.config.accept_threads", RECD_INT, "1", RECU_RESTART_TS, RR_NULL, RECC_INT, "[0-" TS_STR(TS_MAX_NUMBER_EVENT_THREADS) "]", RECA_READ_ONLY}
,
{RECT_CONFIG, "proxy.config.task_threads", RECD_INT, "2", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1-" TS_STR(TS_MAX_NUMBER_EVENT_THREADS) "]", RECA_READ_ONLY}
Expand Down