Skip to content

Commit db6e8b9

Browse files
authored
AAP-40782 Fix too-low max_workers value, dump running at capacity (ansible#15873)
* Dump running tasks when running out of capacity * Use same logic for max_workers and capacity * Address case where CPU capacity is the constraint * Add a test for correspondence * Fake redis to make tests work
1 parent 4834177 commit db6e8b9

File tree

7 files changed

+155
-44
lines changed

7 files changed

+155
-44
lines changed

awx/main/dispatch/pool.py

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import traceback
88
from datetime import datetime
99
from uuid import uuid4
10+
import json
1011

1112
import collections
1213
from multiprocessing import Process
@@ -25,7 +26,10 @@
2526

2627
from awx.main.models import UnifiedJob
2728
from awx.main.dispatch import reaper
28-
from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity
29+
from awx.main.utils.common import get_mem_effective_capacity, get_corrected_memory, get_corrected_cpu, get_cpu_effective_capacity
30+
31+
# ansible-runner
32+
from ansible_runner.utils.capacity import get_mem_in_bytes, get_cpu_count
2933

3034
if 'run_callback_receiver' in sys.argv:
3135
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
@@ -307,6 +311,41 @@ def stop(self, signum):
307311
logger.exception('could not kill {}'.format(worker.pid))
308312

309313

314+
def get_auto_max_workers():
315+
"""Method we normally rely on to get max_workers
316+
317+
Uses almost same logic as Instance.local_health_check
318+
The important thing is to be MORE than Instance.capacity
319+
so that the task-manager does not over-schedule this node
320+
321+
Ideally we would just use the capacity from the database plus reserve workers,
322+
but this poses some bootstrap problems where OCP task containers
323+
register themselves after startup
324+
"""
325+
# Get memory from ansible-runner
326+
total_memory_gb = get_mem_in_bytes()
327+
328+
# This may replace memory calculation with a user override
329+
corrected_memory = get_corrected_memory(total_memory_gb)
330+
331+
# Get same number as max forks based on memory, this function takes memory as bytes
332+
mem_capacity = get_mem_effective_capacity(corrected_memory, is_control_node=True)
333+
334+
# Follow same process for CPU capacity constraint
335+
cpu_count = get_cpu_count()
336+
corrected_cpu = get_corrected_cpu(cpu_count)
337+
cpu_capacity = get_cpu_effective_capacity(corrected_cpu, is_control_node=True)
338+
339+
# Here is what is different from health checks,
340+
auto_max = max(mem_capacity, cpu_capacity)
341+
342+
# add magic number of extra workers to ensure
343+
# we have a few extra workers to run the heartbeat
344+
auto_max += 7
345+
346+
return auto_max
347+
348+
310349
class AutoscalePool(WorkerPool):
311350
"""
312351
An extended pool implementation that automatically scales workers up and
@@ -320,19 +359,7 @@ def __init__(self, *args, **kwargs):
320359
super(AutoscalePool, self).__init__(*args, **kwargs)
321360

322361
if self.max_workers is None:
323-
settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None)
324-
if settings_absmem is not None:
325-
# There are 1073741824 bytes in a gigabyte. Convert bytes to gigabytes by dividing by 2**30
326-
total_memory_gb = convert_mem_str_to_bytes(settings_absmem) // 2**30
327-
else:
328-
total_memory_gb = (psutil.virtual_memory().total >> 30) + 1 # noqa: round up
329-
330-
# Get same number as max forks based on memory, this function takes memory as bytes
331-
self.max_workers = get_mem_effective_capacity(total_memory_gb * 2**30)
332-
333-
# add magic prime number of extra workers to ensure
334-
# we have a few extra workers to run the heartbeat
335-
self.max_workers += 7
362+
self.max_workers = get_auto_max_workers()
336363

337364
# max workers can't be less than min_workers
338365
self.max_workers = max(self.min_workers, self.max_workers)
@@ -346,6 +373,9 @@ def __init__(self, *args, **kwargs):
346373
self.scale_up_ct = 0
347374
self.worker_count_max = 0
348375

376+
# last time we wrote current tasks, to avoid too much log spam
377+
self.last_task_list_log = time.monotonic()
378+
349379
def produce_subsystem_metrics(self, metrics_object):
350380
metrics_object.set('dispatcher_pool_scale_up_events', self.scale_up_ct)
351381
metrics_object.set('dispatcher_pool_active_task_count', sum(len(w.managed_tasks) for w in self.workers))
@@ -463,6 +493,14 @@ def up(self):
463493
self.worker_count_max = new_worker_ct
464494
return ret
465495

496+
@staticmethod
497+
def fast_task_serialization(current_task):
498+
try:
499+
return str(current_task.get('task')) + ' - ' + str(sorted(current_task.get('args', []))) + ' - ' + str(sorted(current_task.get('kwargs', {})))
500+
except Exception:
501+
# just make sure this does not make things worse
502+
return str(current_task)
503+
466504
def write(self, preferred_queue, body):
467505
if 'guid' in body:
468506
set_guid(body['guid'])
@@ -484,6 +522,15 @@ def write(self, preferred_queue, body):
484522
if isinstance(body, dict):
485523
task_name = body.get('task')
486524
logger.warning(f'Workers maxed, queuing {task_name}, load: {sum(len(w.managed_tasks) for w in self.workers)} / {len(self.workers)}')
525+
# Once every 10 seconds write out task list for debugging
526+
if time.monotonic() - self.last_task_list_log >= 10.0:
527+
task_counts = {}
528+
for worker in self.workers:
529+
task_slug = self.fast_task_serialization(worker.current_task)
530+
task_counts.setdefault(task_slug, 0)
531+
task_counts[task_slug] += 1
532+
logger.info(f'Running tasks by count:\n{json.dumps(task_counts, indent=2)}')
533+
self.last_task_list_log = time.monotonic()
487534
return super(AutoscalePool, self).write(preferred_queue, body)
488535
except Exception:
489536
for conn in connections.all():

awx/main/dispatch/worker/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ def run_periodic_tasks(self):
238238
def run(self, *args, **kwargs):
239239
super(AWXConsumerPG, self).run(*args, **kwargs)
240240

241-
logger.info(f"Running worker {self.name} listening to queues {self.queues}")
241+
logger.info(f"Running {self.name}, workers min={self.pool.min_workers} max={self.pool.max_workers}, listening to queues {self.queues}")
242242
init = False
243243

244244
while True:

awx/main/tests/data/sleep_task.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import time
2+
import logging
3+
4+
from awx.main.dispatch import get_task_queuename
5+
from awx.main.dispatch.publish import task
6+
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
@task(queue=get_task_queuename)
12+
def sleep_task(seconds=10, log=False):
13+
if log:
14+
logger.info('starting sleep_task')
15+
time.sleep(seconds)
16+
if log:
17+
logger.info('finished sleep_task')

awx/main/tests/functional/commands/test_callback_receiver.py

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -34,48 +34,26 @@ def test_wrapup_does_send_notifications(mocker):
3434
mock.assert_called_once_with('succeeded')
3535

3636

37-
class FakeRedis:
38-
def keys(self, *args, **kwargs):
39-
return []
40-
41-
def set(self):
42-
pass
43-
44-
def get(self):
45-
return None
46-
47-
@classmethod
48-
def from_url(cls, *args, **kwargs):
49-
return cls()
50-
51-
def pipeline(self):
52-
return self
53-
54-
5537
class TestCallbackBrokerWorker(TransactionTestCase):
5638
@pytest.fixture(autouse=True)
57-
def turn_off_websockets(self):
39+
def turn_off_websockets_and_redis(self, fake_redis):
5840
with mock.patch('awx.main.dispatch.worker.callback.emit_event_detail', lambda *a, **kw: None):
5941
yield
6042

61-
def get_worker(self):
62-
with mock.patch('redis.Redis', new=FakeRedis): # turn off redis stuff
63-
return CallbackBrokerWorker()
64-
6543
def event_create_kwargs(self):
6644
inventory_update = InventoryUpdate.objects.create(source='file', inventory_source=InventorySource.objects.create(source='file'))
6745
return dict(inventory_update=inventory_update, created=inventory_update.created)
6846

6947
def test_flush_with_valid_event(self):
70-
worker = self.get_worker()
48+
worker = CallbackBrokerWorker()
7149
events = [InventoryUpdateEvent(uuid=str(uuid4()), **self.event_create_kwargs())]
7250
worker.buff = {InventoryUpdateEvent: events}
7351
worker.flush()
7452
assert worker.buff.get(InventoryUpdateEvent, []) == []
7553
assert InventoryUpdateEvent.objects.filter(uuid=events[0].uuid).count() == 1
7654

7755
def test_flush_with_invalid_event(self):
78-
worker = self.get_worker()
56+
worker = CallbackBrokerWorker()
7957
kwargs = self.event_create_kwargs()
8058
events = [
8159
InventoryUpdateEvent(uuid=str(uuid4()), stdout='good1', **kwargs),
@@ -90,7 +68,7 @@ def test_flush_with_invalid_event(self):
9068
assert worker.buff == {InventoryUpdateEvent: [events[1]]}
9169

9270
def test_duplicate_key_not_saved_twice(self):
93-
worker = self.get_worker()
71+
worker = CallbackBrokerWorker()
9472
events = [InventoryUpdateEvent(uuid=str(uuid4()), **self.event_create_kwargs())]
9573
worker.buff = {InventoryUpdateEvent: events.copy()}
9674
worker.flush()
@@ -104,7 +82,7 @@ def test_duplicate_key_not_saved_twice(self):
10482
assert worker.buff.get(InventoryUpdateEvent, []) == []
10583

10684
def test_give_up_on_bad_event(self):
107-
worker = self.get_worker()
85+
worker = CallbackBrokerWorker()
10886
events = [InventoryUpdateEvent(uuid=str(uuid4()), counter=-2, **self.event_create_kwargs())]
10987
worker.buff = {InventoryUpdateEvent: events.copy()}
11088

@@ -117,7 +95,7 @@ def test_give_up_on_bad_event(self):
11795
assert InventoryUpdateEvent.objects.filter(uuid=events[0].uuid).count() == 0 # sanity
11896

11997
def test_flush_with_empty_buffer(self):
120-
worker = self.get_worker()
98+
worker = CallbackBrokerWorker()
12199
worker.buff = {InventoryUpdateEvent: []}
122100
with mock.patch.object(InventoryUpdateEvent.objects, 'bulk_create') as flush_mock:
123101
worker.flush()
@@ -127,7 +105,7 @@ def test_postgres_invalid_NUL_char(self):
127105
# In postgres, text fields reject NUL character, 0x00
128106
# tests use sqlite3 which will not raise an error
129107
# but we can still test that it is sanitized before saving
130-
worker = self.get_worker()
108+
worker = CallbackBrokerWorker()
131109
kwargs = self.event_create_kwargs()
132110
events = [InventoryUpdateEvent(uuid=str(uuid4()), stdout="\x00", **kwargs)]
133111
assert "\x00" in events[0].stdout # sanity

awx/main/tests/functional/conftest.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,33 @@ def swagger_autogen(requests=__SWAGGER_REQUESTS__):
6363
return requests
6464

6565

66+
class FakeRedis:
67+
def keys(self, *args, **kwargs):
68+
return []
69+
70+
def set(self):
71+
pass
72+
73+
def get(self):
74+
return None
75+
76+
@classmethod
77+
def from_url(cls, *args, **kwargs):
78+
return cls()
79+
80+
def pipeline(self):
81+
return self
82+
83+
def ping(self):
84+
return
85+
86+
87+
@pytest.fixture
88+
def fake_redis():
89+
with mock.patch('redis.Redis', new=FakeRedis): # turn off redis stuff
90+
yield
91+
92+
6693
@pytest.fixture
6794
def user():
6895
def u(name, is_superuser=False):

awx/main/tests/functional/models/test_ha.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
# AWX
44
from awx.main.ha import is_ha_environment
55
from awx.main.models.ha import Instance
6+
from awx.main.dispatch.pool import get_auto_max_workers
7+
8+
# Django
9+
from django.test.utils import override_settings
610

711

812
@pytest.mark.django_db
@@ -17,3 +21,25 @@ def test_db_localhost():
1721
Instance.objects.create(hostname='foo', node_type='hybrid')
1822
Instance.objects.create(hostname='bar', node_type='execution')
1923
assert is_ha_environment() is False
24+
25+
26+
@pytest.mark.django_db
27+
@pytest.mark.parametrize(
28+
'settings',
29+
[
30+
dict(SYSTEM_TASK_ABS_MEM='16Gi', SYSTEM_TASK_ABS_CPU='24', SYSTEM_TASK_FORKS_MEM=400, SYSTEM_TASK_FORKS_CPU=4),
31+
dict(SYSTEM_TASK_ABS_MEM='124Gi', SYSTEM_TASK_ABS_CPU='2', SYSTEM_TASK_FORKS_MEM=None, SYSTEM_TASK_FORKS_CPU=None),
32+
],
33+
ids=['cpu_dominated', 'memory_dominated'],
34+
)
35+
def test_dispatcher_max_workers_reserve(settings, fake_redis):
36+
"""This tests that the dispatcher max_workers matches instance capacity
37+
38+
Assumes capacity_adjustment is 1,
39+
plus reserve worker count
40+
"""
41+
with override_settings(**settings):
42+
i = Instance.objects.create(hostname='test-1', node_type='hybrid')
43+
i.local_health_check()
44+
45+
assert get_auto_max_workers() == i.capacity + 7, (i.cpu, i.memory, i.cpu_capacity, i.mem_capacity)

tools/scripts/firehose_tasks.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#!/usr/bin/env python
2+
3+
from django import setup
4+
5+
from awx import prepare_env
6+
7+
prepare_env()
8+
9+
setup()
10+
11+
# Keeping this in test folder allows it to be importable
12+
from awx.main.tests.data.sleep_task import sleep_task
13+
14+
15+
for i in range(634):
16+
sleep_task.delay()

0 commit comments

Comments
 (0)