-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathAsyncIntro.txt
More file actions
580 lines (460 loc) · 20.8 KB
/
AsyncIntro.txt
File metadata and controls
580 lines (460 loc) · 20.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
libdlb: introduction to asynchronous programming
Daniel Beer <dlbeer@gmail.com>
19 Mar 2013
Contents
========================================================================
1. What is asynchronous programming?
2. Example: asynchronous counter
2.1. Principles
3. Asynchronous IO primitives
4. Mailbox-based patterns
4.1. Producer/consumer
4.2. Fork/join
4.3. Reactor
1. What is asynchronous programming?
========================================================================
System calls like read(), write() and sleep() from POSIX are APIs for
sequential programming. You call them to perform a single IO operation,
and they return when that operation is completed. This is fine for
programs with simple IO requirements, but in more complex programs and
in servers, it's often required that you be able to carry out and wait
for several IO operations concurrently -- you wouldn't want an HTTP
server to have to complete an entire request before serving a new
client.
There are two popular strategies for doing this:
* Threads: spawn a new thread for each separable IO operation --
perhaps one thread for each client in an HTTP server. Each thread
can then be written as a series of sequential IO operations.
Pros: simple programming interface, ability to use multiple cores
Cons: significant OS overhead if each thread is largely IO bound
* Events: wait for an IO to become ready by registering a callback.
When IO becomes possible, the callback is invoked from a
single-threaded event loop and then performs the IO.
Pros: very low OS overhead, low latency
Cons: unable to make use of multiple cores, more difficult
programming interface due to inversion of control.
The strategy described here is one which combines good features from
both, and can be implemented without the need for a new language, OS or
runtime:
* Asynchronous programming: begin an IO operation by registering a
callback. The IO is carried out in the background and when complete,
the callback is invoked in one of several threads in a pool.
Pros: very low OS overhead, low latency, able to make use of
multiple cores
Cons: difficult programming interface (similar to events)
The description above of asynchronous programming would lead you to
believe that it's nearly the same as event-based programming. There are
a few subtle, but important, differences:
* In an event-based system, pending events may be cancelled, and if
this is done, the callback is prevented from being invoked. This
isn't possible to do in a deterministic way in an asynchronous
system (but cancellation of a kind is possible).
* In an event-based system, an event may be periodic (for example, an
interval timer). Asynchronous operations maintain a strict
one-to-one relationship between requests and callback.
Both of these differences are related to the way in which callbacks are
dispatched. An event-based system dispatches from a single thread,
whereas an asynchronous system dispatches from a thread pool. This
introduces some constraints and rules which must be followed in order to
avoid race conditions.
2. Example: asynchronous counter
========================================================================
/* This program counts to 10, displaying one number per second using
* asynchronous waits.
*/
#include <stdio.h>
#include "io/ioq.h"
#include "io/mailbox.h"
/* The IO queue manager. */
static struct ioq global_queue;
/* A mailbox is a thread-safe object that contains an atomic flags
* register. We can wait asynchronously for flags to change state, but
* we're not making use of this feature here -- we just want an atomic
* variable.
*/
static struct mailbox done_box;
/* This is our asynchronous timer callback. We don't know exactly which
* thread it will run on, but it'll be one of the threads from the IO
* queue's thread pool.
*/
static void timer_continue(struct waitq_timer *my_timer)
{
static int counter = 0;
/* Every time the timer wait finishes, we increment our counter
* and display the new value.
*/
counter++;
printf("%d\n", counter);
if (counter >= 10) {
/* If this is the tenth time we've invoked the callback,
* raise a mailbox flag to indicate that we're done.
*/
mailbox_raise(&done_box, 1);
/* The main thread can read the flag, but it doesn't
* know that the flag has changed (because we're running
* in a different thread). We need to interrupt the call
* in the main thread to ioq_iterate() in order to make
* it check.
*/
ioq_notify(&global_queue);
} else {
/* We haven't yet run 10 times. Asynchronously wait once
* more. Note that as soon as we call
* waitq_timer_wait(), the callback is "live" and may be
* invoked. That's ok, because this is the last thing
* we're doing in this callback instance, so it's safe
* to run another instance immediately.
*/
waitq_timer_wait(my_timer, 1000, timer_continue);
}
}
int main(void)
{
struct waitq_timer my_timer;
/* Initialize the IO queue with a single thread in the thread
* pool. This might fail, but we're omitting error handling for
* brevity.
*/
if (ioq_init(&global_queue, 1) < 0)
abort();
/* Initialize our mailbox. We need to associate it with an IO
* queue for asynchronous waits to work (although we're not
* using them here).
*/
mailbox_init(&done_box, ioq_runq(&done_box));
/* Initialize the asynchronous timer by associating it with the
* IO queue.
*/
waitq_timer_init(&my_timer, ioq_waitq(&global_queue));
/* Begin an asynchronous wait on the timer. Only one
* asynchronous wait may be in progress at a time -- as soon as
* timer_continue() is invoked, that's our signal that the
* operation has completed.
*/
waitq_timer_wait(&my_timer, 1000, timer_continue);
/* Keep running the IO queue until a flag is raised in the
* mailbox.
*/
while (!mailbox_take(&done_box, MAILBOX_ALL_FLAGS))
ioq_iterate(&global_queue);
/* Free resources. Note that the timer doesn't acquire resources
* for itself, and doesn't need to be destroyed.
*/
mailbox_destroy(&done_box);
ioq_destroy(&global_queue);
return 0;
}
2.1. Principles
------------------------------------------------------------------------
With reference to the example program, note the following principles:
* One-to-one correspondence: ONE request = ONE callback, always. Note
that waitq_timer_wait() is invoked 10 times with timer_continue()
specified as a callback. timer_continue() is also called 10 times.
* Cancellation: cancellation wasn't shown above, but it can be done,
and it doesn't work the same way as in an event-based program. After
a call to waitq_timer_wait(), we could have cancelled the timer
with:
waitq_timer_cancel(&my_timer)
What would this have done? It would have caused timer_continue() to
be invoked as soon as possible, rather than after 1000 ms had
elapsed -- so the one-to-one relationship between requests and
callbacks is preserved. Note that it's the caller's problem to
distinguish between "real" callbacks and cancellations. This could
be done either by setting a (thread-safe) flag before
waitq_timer_cancel(), or by checking the clock against the expected
deadline in the timer callback.
This is known as asynchronous cancellation, whereas the event-based
kind (where the callback is prevented from being invoked) is known
as synchronous cancellation. We can't do synchronous cancellation,
because at the time you call waitq_timer_cancel(), the callback
might already be executing in a different thread!
Since cancellation is often performed from a different logical
thread than the one which will run the callback, it sometimes
happens that the cancellation request occurs *after* the callback
has already executed. If you intend to re-use the IO object, this is
a possible source of race conditions if you aren't careful.
* Thread safety: IO objects, such as struct waitq_timer, are not
thread-safe themselves, although they do interact with "hidden"
threads in the IO system. What this means in practice is that you
must avoid accessing the same IO object from different threads
simultaneously.
There is one exception to this, and that is cancellation: you may
call waitq_timer_cancel(), for example, from any thread.
* Limited subscription: IO objects may have at most only one
outstanding IO operation in progress at any time.
In the example program, thread safety is guaranteed by the fact that we
never access my_timer between waitq_timer_wait() and timer_continue().
The second, and subsequent calls to waitq_timer_wait(), are done in tail
position from timer_continue().
This is a common pattern in asynchronous programming when dealing with a
single isolated sequence of IO operations: because all requests are made
in tail position from callbacks, the sequence of callbacks is
effectively single threaded. The following diagram illustrates the
execution of code in callbacks and the hidden IO operations:
Thread pool IO manager
=======================================================
+------------+
| Init func |
+------------+ ====== wait() ======>
+----------------+
| IO operation 1 |
<===== complete ===== +----------------+
+------------+
| Callback 1 |
+------------+ ====== wait() ======>
+----------------+
| IO operation 2 |
<===== complete ===== +----------------+
+------------+
| Callback 2 |
+------------+
The callbacks and IO operations are strictly interleaved, with control
transfers taking place during requests and completions.
While the diagram above shows IO operations as distinct, in practice,
multiple IO operations will be efficiently multiplexed in the IO manager
using facilities such as epoll().
3. Asynchronous IO primitives
========================================================================
The following header files contain useful IO primitives:
* runq.h: struct runq_task is the simplest IO primitive. A request
to runq_task_exec() invokes the given callback on the thread pool as
soon as possible. This is useful if you want to defer work, or as an
easy way to parallelize operations (in conjunction with the mailbox,
described below).
* waitq.h: struct waitq_timer is a one-shot timer. You can
asynchronously wait by specifying a time period in milliseconds and
a callback to be invoked after the time period has elapsed. Timers
support asynchronous cancellation.
* ioq.h: struct ioq_fd is a file descriptor monitor. After
initializing it by associating it with a file descriptor and an IO
queue, you can asynchronously wait for readability, writability, or
error conditions on the selected file descriptor. These objects
support asynchronous cancellation.
Note two important restrictions:
- You may not associate more than one struct ioq_fd with the same
file descriptor.
- You may not have more than one outstanding wait in progress at a
time. If you need to wait for a new condition when you're
already waiting for an old one, you must first asynchronously
cancel the old wait operation.
* io/mailbox.h: struct mailbox is an asynchrous IPC primitive,
intended for signalling between logical threads. It contains of a
thread-safe 32-bit register, each bit of which is treated as a
"flag". Two atomic operations are possible:
- raise(): set any subset of flags in the mailbox, leaving the
others untouched.
- take(): clear any subset of flags in the mailbox, leaving the
others untouched. The state of the mailbox, prior to clearing,
is returned.
These two operations alone make it useful as a simple atomic
variable. However, to allow actual IPC, two asynchronous wait
operations are provided:
- wait(): given a subset of flags and a callback, invoke the
callback when any of the flags in the subset become raised.
- wait_any(): given a set of flags and a callback, invoke the
callback when ALL of the flags in the subset become raised.
The wait operations are "level triggered": if the condition waited
for is already true at the time of the call, the callback will be
invoked immediately (but still asynchronously).
Check the header files for more details. Most of the asynchronous
operations are implemented using intrusive data structures in a way
which guarantees that they can't fail.
4. Mailbox-based patterns
========================================================================
The simplest pattern of asynchronous programming is the "single-strand",
or single logical thread consisting of a sequence of asynchronous
operations, as shown in the example code above.
More complex and interesting patterns are possible by making use of
struct mailbox, defined in io/mailbox.h. Some useful concepts are
described in subsections here.
4.1. Producer/consumer
------------------------------------------------------------------------
The most obvious use of a mailbox is to use it as an IPC primitive. In
this case, it may be associated with a shared data structure (in this
example, a FIFO queue) to provide more sophisticated communication.
First, a flag is chosen to be associated with the FIFO queue. It doesn't
matter which, just as long as it's not used for any other purpose. Our
data structures and definitions are:
#define QUEUE_READY_FLAG MAILBOX_FLAG(0)
thr_mutex_t lock;
struct slist fifo;
struct mailbox mbox;
When the producer wants to send an item via the queue, it does:
/* Safely place the data in the queue */
thr_mutex_lock(&lock);
slist_append(&fifo, &my_data.node);
thr_mutex_unlock(&lock);
/* Wake up the consumer */
mailbox_raise(&mbox, QUEUE_READY_FLAG);
The consumer should repeatedly wait for the flag to be raised, and empty
the queue:
/* Attempt to pop a datum from the queue with the
* lock held.
*/
static struct datum *pop_queue_locked(void)
{
struct slist_node *n;
thr_mutex_lock(&lock);
n = slist_pop(&fifo);
thr_mutex_unlock(&lock);
if (n)
return container_of(n, struct datum, node);
return NULL;
}
/* Queue handler: call this function once to start the consumption
* of data.
*/
static void consume_queue(struct mailbox *m)
{
/* Acknowledge the wake-up request */
mailbox_take(&mbox, QUEUE_READY_FLAG);
/* Remove data from the queue. We must loop until the queue
* is empty to avoid stalling (all wakeups for items in the
* queue have already been delivered -- we won't be woken up
* again unless new data arrives).
*/
for (;;) {
struct datum *d = pop_queue_locked();
/* Nothing left? Stop processing and wait again. */
if (!d)
break;
process_data(d);
}
/* Wait again. this must be the last thing we do, in order
* to maintain a single strand of execution.
*/
mailbox_wait(&mbox, QUEUE_READY_FLAG, consume_queue);
}
4.2. Fork/join
------------------------------------------------------------------------
Another useful pattern, making use of the wait_all operation, is using a
mailbox to coordinate parallel operations. These operations can be
either IO operations or computations (using the run queue).
In this example, we use a mailbox to synchronize with the completion of
two parallel tasks:
#define TASK_A_FLAG MAILBOX_FLAG(0)
#define TASK_B_FLAG MAILBOX_FLAG(1)
static struct mailbox box;
static struct runq_task task_a;
static struct runq_task task_b;
static void task_a_func(struct runq_task *t)
{
do_something_a();
mailbox_raise(&box, TASK_A_FLAG);
}
static void task_b_func(struct runq_task *t)
{
do_something_b();
mailbox_raise(&box, TASK_B_FLAG);
}
static void all_done(struct mailbox *m)
{
/* Tasks A and B have both completed... */
mailbox_take(&box, TASK_A_FLAG | TASK_B_FLAG);
/* Process the results here */
}
static void do_both_tasks(void)
{
/* Asynchronously start both tasks. Both task_a_func and
* task_b_func can immediately be scheduled in parallel.
*/
runq_task_exec(&task_a, task_a_func);
runq_task_exec(&task_b, task_b_func);
/* Asynchronously wait for both tasks to finish */
mailbox_wait_all(&box, TASK_A_FLAG | TASK_B_FLAG, all_done);
}
In this example, task_a_func and task_b_func were computations, started
via a struct runq_task. However, they could just have easily been
callbacks from an asynchronous IO operation, or a mix of both.
A few rules must be observed for thread-safety:
* task_a_func and task_b_func can run in parallel, and must not share
data without correct locking.
* Any data used by either task can't be touched until they have
finished. In practice, this means avoiding touching any of the
task's data between the call to initiate the task (runq_task_exec)
and the callback which indicates that both tasks have completed
(all_done).
* Since mailbox_raise() may result in the execution of all_done(), it
is the last statement in the task callbacks.
* Similarly, mailbox_wait_all() may also result in the execution of
all_done(), and it is again the last statement in the initiating
function.
4.3. Reactor
------------------------------------------------------------------------
Often, it's useful to express parts of a program in terms of a state
machine responding to a serialized sequence of events (in an
event-driven system, your entire program would be of this form). In an
asynchronously programmed server, you might want to run a state machine
for each client, for example.
This can be done by constructing a main loop based around an
asynchronous mailbox wait: a callback function wakes up on receipt of a
mailbox flag, processes events, and then waits again. Other asynchronous
operations started by the main loop run concurrently to perform simple
IO tasks, raising flags upon completion.
The general strategy for implementing this is to first identify the IO
operations which you will need to perform. These should be as primitive
as possible, because all the completion handler is expected to do is
report to the main loop that the IO operation has completed.
The main loop itself centres, as indicated above, around a mailbox:
static struct mailbox loop_box;
static void main_loop(void)
{
/* Take events from the mailbox */
const mailbox_flags_t events =
mailbox_take(&loop_box, MAILBOX_ALL_FLAGS);
/* React to flags in events... */
/* (see below for details) */
/* Wait for more events */
mailbox_wait(&loop_box, MAILBOX_ALL_FLAGS, main_loop);
}
For each IO operation, you need the following functions and data
structures:
/* A unique flag to indicate completion */
#define IO_FLAG_A MAILBOX_FLAG(0)
/* A state flag to indicate that the operation is in progress (you
* might prefer to share bits of a bit-field). This flag is accessed
* only by the main loop, so no locking is required.
*/
static int a_in_progress;
/* A function to start the IO operation */
static void begin_a(void)
{
if (!a_in_progress) {
io_operation_wait(&my_object, a_complete);
a_in_progress = 1;
}
}
/* ...and a callback function. This does nothing except for waking
* up the main loop via a mailbox flag.
*/
static void a_complete(void)
{
mailbox_raise(&loop_box, IO_FLAG_A);
}
In the main loop, you need to react to the receipt of IO_FLAG_A:
if (events & IO_FLAG_A) {
a_in_progress = 0;
react_to_a();
/* You might also want to call begin_a(), if this is a
* continuous process.
*/
}
Note that thread-safety is achieved via the fact that the only functions
to run in parallel with the main thread are the IO completion callbacks,
and all they do is call mailbox_raise(), which is an entirely safe
operation. The end result is a serialized sequence of invocations of
main_loop(), each retrieving and responding to a set of events.
The only tricky part is terminating the loop: at the time of
termination, it may be that you have IO operations outstanding. When
they complete, they will attempt to raise a mailbox flag, so the mailbox
can't be destroyed until this happens.
Fortunately, shut-down can be arranged safely provided that you are
keeping track of outstanding operations. First, request cancellation of
each IO operation which you have recorded as being in progress
(cancellation will be specific to the particular type of IO operation).
Record that you are in the process of shutting down, and refrain from
starting new operations when old ones complete.
As each operation completes, check to see if there are any more
outstanding. If no more operations are outstanding, it's safe to
terminate by cleaning up data structures and simply returing from the
main loop.