forked from hellysmile/asyncio_monkey
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasyncio_monkey.py
More file actions
54 lines (35 loc) · 1.11 KB
/
asyncio_monkey.py
File metadata and controls
54 lines (35 loc) · 1.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import sys
PY_360 = sys.version_info >= (3, 6, 0)
__version__ = '0.0.1'
def patch_log_destroy_pending():
import asyncio
if hasattr(asyncio.tasks.Task, 'patched'):
return
class Task(asyncio.tasks.Task):
patched = True
def _get_log_destroy_pending(self):
return True
def _set_log_destroy_pending(self, value):
pass
_log_destroy_pending = property(
_get_log_destroy_pending,
_set_log_destroy_pending,
)
del _get_log_destroy_pending
del _set_log_destroy_pending
asyncio.tasks.Task = Task
asyncio.Task = asyncio.tasks.Task
def patch_get_event_loop():
import asyncio
if hasattr(asyncio.events.get_event_loop, 'patched'):
return
if not PY_360:
return
def get_event_loop():
return asyncio.events.get_event_loop_policy().get_event_loop()
get_event_loop.patched = True
asyncio.events.get_event_loop = get_event_loop
asyncio.get_event_loop = asyncio.events.get_event_loop
def patch_all():
patch_log_destroy_pending()
patch_get_event_loop()