Skip to content

Commit 94764a1

Browse files
AAP-42649 Flag-gated use of "dispatcherd" as its own library (ansible#15981)
Use dynamic AWX max_workers value Make basic --status and --running commands work Make feature flag enabled true by default for development * [dispatcherd] Dispatcher socket-based `--status` demo working (ansible#15908) * Fix Task Decorator to Work With and Without Feature Flag (AAP-41775) (ansible#15911) * refactor(system): extract common heartbeat helpers and split cluster_node_heartbeat Extract common heartbeat logic into helper functions: _heartbeat_instance_management: consolidates instance management, health checks, and lost-instance detection. _heartbeat_check_versions: compares instance versions and initiates shutdown when necessary. _heartbeat_handle_lost_instances: reaps jobs and marks lost instances offline. Refactor the original cluster_node_heartbeat to use these helpers and retain legacy behavior (using bind_kwargs). Introduce adispatch_cluster_node_heartbeat for dispatcherd: uses the control API to retrieve running tasks and reaps them. Link the two implementations by attaching adispatch_cluster_node_heartbeat as the _new_method on cluster_node_heartbeat. * feat(publish): delegate heartbeat task submission to new dispatcherd implementation Update apply_async to check at runtime if FEATURE_NEW_DISPATCHER is enabled. When the task is cluster_node_heartbeat and a _new_method is attached, delegate the task submission to the new dispatcherd implementation. Preserve the original behavior for all other tasks and fallback on error. * refactor(system): extract task ID retrieval from dispatcherd into helper function Improves readability of adispatch_cluster_node_heartbeat by extracting the complex UUID parsing logic into a dedicated helper function. Adds clearer error handling and follows established code patterns. * fix(dispatcher): Enable task decorator to work with and without feature flag Implemented a new approach for handling task execution with feature flags by attaching alternative implementations to apply_async._new_method. This allows cluster_node_heartbeat to work correctly with both the legacy and new dispatcher systems without modifying core decorator logic. AAP-41775 * fix(dispatcher): Improve error handling and logging in feature flag implementation - Add error handling when attaching alternative dispatcher implementation - Fix method self-reference in apply_async to properly use cls.apply_async - Document limitations of this targeted approach for specific tasks - Add logging for better debugging of dispatcher selection - Ensure decorator timing by keeping method attachment after function definitions This completes the robust implementation for switching between dispatcher implementations based on feature flags. AAP-41775 * fix(dispatcher): Implement registry pattern for dispatcher feature flag compatibility Replaces direct method attribute assignment with a global registry for alternative implementations. The original approach tried to attach new methods directly to apply_async bound methods, which fails because bound methods don't support attribute assignment in Python. The registry pattern: - Creates a global ALTERNATIVE_TASK_IMPLEMENTATIONS dict in publish.py - Registers alternative implementations by task name - Modifies apply_async to check the registry when feature flag is enabled - Adds extensive logging throughout the process for debugging This enables cluster_node_heartbeat to work correctly with both the legacy and new dispatcher implementations based on the FEATURE_NEW_DISPATCHER flag. AAP-41775 * refactor(dispatcher): Remove excessive logging from dispatcher implementation Reduces verbose debugging logs while maintaining essential logging for critical operations. Preserves: - Task implementation selection based on feature flag - Registration success/failure messages - Critical error reporting Removed: - Registry content debugging messages - Repetitive task diagnostics - Non-essential information logging AAP-41775 * fix(dispatcher): Fix shallow copy in dispatcher schedule conversion This resolves "AttributeError: 'float' object has no attribute 'total_seconds'" errors when the dispatcher is restarted. Refs: AAP-41775 * Use IPC mechanism to get running tasks (ansible#15926) * Allow tasks from tasks * Fix failure to limit to waiting jobs * Get job record with lock * Fix failures in dispatcherd feature branch (ansible#15930) * Fully handle DispatcherCancel * Complete rest of preload import work * Complete dispatcherd integration & job cancellation (AAP-43033) (ansible#15941) * feat(dispatcher): Implement job cancellation for new dispatcher Adds feature-flag-aware job cancellation that routes cancel requests to either the legacy dispatcher or the new dispatcherd library based on the FEATURE_NEW_DISPATCHER flag. - Updates cancel_dispatcher_process() to use dispatcherd's control API when enabled - Handles both direct cancellation and task manager workflow cancellation cases - Works with DispatcherCancel exception handling to properly handle SIGUSR1 signals AAP-43033 * fix(dispatcher): Update run_dispatcher.py to properly handle task cancellation Modifies the cancel command in run_dispatcher.py to properly cancel tasks when the FEATURE_NEW_DISPATCHER flag is enabled, rather than just listing running tasks. The implementation translates each task UUID to the appropriate filter format expected by the dispatcherd control API, maintaining the same behavior as the original implementation. Part of: AAP-43033 * refactor(system): Refactor dispatch_startup() to extract common startup logic and branch based on feature flag This commit refactors the dispatch_startup() function to improve clarity and consistency across the legacy and new dispatcherd flows. No dispatcher-specific functionality is needed beyond the changes made, so this refactoring improves robustness without altering core behavior. * refactor(system): Refactor inform_cluster_of_shutdown() for clarity * refactor(tasks): Replace @task with @task_awx across 22 tasks for dispatcher compatibility - Migrated all task decorators to use @task_awx, ensuring dispatcher-aware behavior. - Tested each task with the new dispatcherd, verifying that tasks using the registry pattern execute correctly without needing binder‐based alternative implementations. - Removed redundant logging and outdated comments. - Legacy tasks that do not require special parameter extraction continue to use their original logic. - This commit reflects our complete journey of testing and verifying dispatcherd compatibility across all 22 tasks. * refactor(publish): fix linter * Fix bug from the branch rebase * AAP-43763 Add tests for connection management in dispatcherd workers (ansible#15949) * Add test for job cancel in live tests * Fix bug from the branch rebase * Add test for connection recovery after connection broke * Add test for breaking connection * Fix dispatcherd bugs: schedule aliases, job kwargs handling, cancel handling (ansible#15960) * Put in job kwargs handling, not done before * AAP-44382 [dispatcherd] Fixes for running with feature flag off (ansible#15973) * Use correct decorator for test of tasks * Finalize dispatcherd feature branch (ansible#15975) * Work dispatcherd into dependency management system * Use util methods from DAB * Rename the dispatcherd feature flag, and flip default to not-enabled * Move to new submit_task method * Update the location of the sock file * AAP-44381 Make dispatcherd config loading more lazy (ansible#15979) * Make dispatcherd config loading more lazy * Make submission error more obvious * Fix signal handling gap, hijack SIGUSR1 from dispatcherd (ansible#15983) * Fix signal handling gap, hijack SIGUSR1 from dispatcherd * Minor adjustments to dispatcherd status command * [dispatcherd] Get rid of alternative task registry (ansible#15984) Get rid of alternative task registry * Fix deadlock error and other cleanup errors (ansible#15987) * Move to proper error handling location --------- Co-authored-by: artem_tiupin <[email protected]>
1 parent c1b6f9a commit 94764a1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1070
-242
lines changed

awx/main/analytics/analytics_tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33

44
# AWX
55
from awx.main.analytics.subsystem_metrics import DispatcherMetrics, CallbackReceiverMetrics
6-
from awx.main.dispatch.publish import task
6+
from awx.main.dispatch.publish import task as task_awx
77
from awx.main.dispatch import get_task_queuename
88

99
logger = logging.getLogger('awx.main.scheduler')
1010

1111

12-
@task(queue=get_task_queuename)
12+
@task_awx(queue=get_task_queuename)
1313
def send_subsystem_metrics():
1414
DispatcherMetrics().send_metrics()
1515
CallbackReceiverMetrics().send_metrics()

awx/main/apps.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import os
22

3+
from dispatcherd.config import setup as dispatcher_setup
4+
35
from django.apps import AppConfig
46
from django.utils.translation import gettext_lazy as _
57
from awx.main.utils.common import bypass_in_test, load_all_entry_points_for
@@ -79,6 +81,10 @@ def load_inventory_plugins(self):
7981
def ready(self):
8082
super().ready()
8183

84+
from awx.main.dispatch.config import get_dispatcherd_config
85+
86+
dispatcher_setup(get_dispatcherd_config())
87+
8288
"""
8389
Credential loading triggers database operations. There are cases we want to call
8490
awx-manage collectstatic without a database. All management commands invoke the ready() code

awx/main/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@
7777
'awx.main.utils.log',
7878
# loggers that may be called getting logging settings
7979
'awx.conf',
80+
# dispatcherd should only use 1 database connection
81+
'dispatcherd',
8082
)
8183

8284
# Reported version for node seen in receptor mesh but for which capacity check

awx/main/dispatch/config.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from django.conf import settings
2+
3+
from ansible_base.lib.utils.db import get_pg_notify_params
4+
from awx.main.dispatch import get_task_queuename
5+
from awx.main.dispatch.pool import get_auto_max_workers
6+
7+
8+
def get_dispatcherd_config(for_service: bool = False) -> dict:
9+
"""Return a dictionary config for dispatcherd
10+
11+
Parameters:
12+
for_service: if True, include dynamic options needed for running the dispatcher service
13+
this will require database access, you should delay evaluation until after app setup
14+
"""
15+
config = {
16+
"version": 2,
17+
"service": {
18+
"pool_kwargs": {
19+
"min_workers": settings.JOB_EVENT_WORKERS,
20+
"max_workers": get_auto_max_workers(),
21+
},
22+
"main_kwargs": {"node_id": settings.CLUSTER_HOST_ID},
23+
"process_manager_cls": "ForkServerManager",
24+
"process_manager_kwargs": {"preload_modules": ['awx.main.dispatch.hazmat']},
25+
},
26+
"brokers": {
27+
"pg_notify": {
28+
"config": get_pg_notify_params(),
29+
"sync_connection_factory": "ansible_base.lib.utils.db.psycopg_connection_from_django",
30+
"default_publish_channel": settings.CLUSTER_HOST_ID, # used for debugging commands
31+
},
32+
"socket": {"socket_path": settings.DISPATCHERD_DEBUGGING_SOCKFILE},
33+
},
34+
"publish": {
35+
"default_control_broker": "socket",
36+
"default_broker": "pg_notify",
37+
},
38+
"worker": {"worker_cls": "awx.main.dispatch.worker.dispatcherd.AWXTaskWorker"},
39+
}
40+
41+
if for_service:
42+
config["producers"] = {
43+
"ScheduledProducer": {"task_schedule": settings.DISPATCHER_SCHEDULE},
44+
"OnStartProducer": {"task_list": {"awx.main.tasks.system.dispatch_startup": {}}},
45+
"ControlProducer": {},
46+
}
47+
48+
config["brokers"]["pg_notify"]["channels"] = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
49+
50+
return config

awx/main/dispatch/hazmat.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import django
2+
3+
# dispatcherd publisher logic is likely to be used, but needs manual preload
4+
from dispatcherd.brokers import pg_notify # noqa
5+
6+
# Cache may not be initialized until we are in the worker, so preload here
7+
from channels_redis import core # noqa
8+
9+
from awx import prepare_env
10+
11+
from dispatcherd.utils import resolve_callable
12+
13+
14+
prepare_env()
15+
16+
django.setup() # noqa
17+
18+
19+
from django.conf import settings
20+
21+
22+
# Preload all periodic tasks so their imports will be in shared memory
23+
for name, options in settings.CELERYBEAT_SCHEDULE.items():
24+
resolve_callable(options['task'])
25+
26+
27+
# Preload in-line import from tasks
28+
from awx.main.scheduler.kubernetes import PodManager # noqa
29+
30+
31+
from django.core.cache import cache as django_cache
32+
from django.db import connection
33+
34+
35+
connection.close()
36+
django_cache.close()

awx/main/dispatch/publish.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
import time
55
from uuid import uuid4
66

7+
from dispatcherd.publish import submit_task
8+
from dispatcherd.utils import resolve_callable
9+
710
from django_guid import get_guid
811
from django.conf import settings
912

@@ -93,6 +96,19 @@ def get_async_body(cls, args=None, kwargs=None, uuid=None, **kw):
9396

9497
@classmethod
9598
def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw):
99+
try:
100+
from flags.state import flag_enabled
101+
102+
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
103+
# At this point we have the import string, and submit_task wants the method, so back to that
104+
actual_task = resolve_callable(cls.name)
105+
return submit_task(actual_task, args=args, kwargs=kwargs, queue=queue, uuid=uuid, **kw)
106+
except Exception:
107+
logger.exception(f"[DISPATCHER] Failed to check for alternative dispatcherd implementation for {cls.name}")
108+
# Continue with original implementation if anything fails
109+
pass
110+
111+
# Original implementation follows
96112
queue = queue or getattr(cls.queue, 'im_func', cls.queue)
97113
if not queue:
98114
msg = f'{cls.name}: Queue value required and may not be None'
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from dispatcherd.worker.task import TaskWorker
2+
3+
from django.db import connection
4+
5+
6+
class AWXTaskWorker(TaskWorker):
7+
8+
def on_start(self) -> None:
9+
"""Get worker connected so that first task it gets will be worked quickly"""
10+
connection.ensure_connection()
11+
12+
def pre_task(self, message) -> None:
13+
"""This should remedy bad connections that can not fix themselves"""
14+
connection.close_if_unusable_or_obsolete()

awx/main/management/commands/run_dispatcher.py

Lines changed: 70 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,21 @@
22
# All Rights Reserved.
33
import logging
44
import yaml
5+
import os
56

67
import redis
78

89
from django.conf import settings
910
from django.core.management.base import BaseCommand, CommandError
1011

12+
from flags.state import flag_enabled
13+
14+
from dispatcherd.factories import get_control_from_settings
15+
from dispatcherd import run_service
16+
from dispatcherd.config import setup as dispatcher_setup
17+
1118
from awx.main.dispatch import get_task_queuename
19+
from awx.main.dispatch.config import get_dispatcherd_config
1220
from awx.main.dispatch.control import Control
1321
from awx.main.dispatch.pool import AutoscalePool
1422
from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker
@@ -40,18 +48,44 @@ def add_arguments(self, parser):
4048
),
4149
)
4250

51+
def verify_dispatcherd_socket(self):
52+
if not os.path.exists(settings.DISPATCHERD_DEBUGGING_SOCKFILE):
53+
raise CommandError('Dispatcher is not running locally')
54+
4355
def handle(self, *arg, **options):
4456
if options.get('status'):
45-
print(Control('dispatcher').status())
46-
return
57+
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
58+
ctl = get_control_from_settings()
59+
running_data = ctl.control_with_reply('status')
60+
if len(running_data) != 1:
61+
raise CommandError('Did not receive expected number of replies')
62+
print(yaml.dump(running_data[0], default_flow_style=False))
63+
return
64+
else:
65+
print(Control('dispatcher').status())
66+
return
4767
if options.get('schedule'):
48-
print(Control('dispatcher').schedule())
68+
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
69+
print('NOT YET IMPLEMENTED')
70+
return
71+
else:
72+
print(Control('dispatcher').schedule())
4973
return
5074
if options.get('running'):
51-
print(Control('dispatcher').running())
52-
return
75+
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
76+
ctl = get_control_from_settings()
77+
running_data = ctl.control_with_reply('running')
78+
print(yaml.dump(running_data, default_flow_style=False))
79+
return
80+
else:
81+
print(Control('dispatcher').running())
82+
return
5383
if options.get('reload'):
54-
return Control('dispatcher').control({'control': 'reload'})
84+
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
85+
print('NOT YET IMPLEMENTED')
86+
return
87+
else:
88+
return Control('dispatcher').control({'control': 'reload'})
5589
if options.get('cancel'):
5690
cancel_str = options.get('cancel')
5791
try:
@@ -60,21 +94,36 @@ def handle(self, *arg, **options):
6094
cancel_data = [cancel_str]
6195
if not isinstance(cancel_data, list):
6296
cancel_data = [cancel_str]
63-
print(Control('dispatcher').cancel(cancel_data))
64-
return
6597

66-
consumer = None
98+
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
99+
ctl = get_control_from_settings()
100+
results = []
101+
for task_id in cancel_data:
102+
# For each task UUID, send an individual cancel command
103+
result = ctl.control_with_reply('cancel', data={'uuid': task_id})
104+
results.append(result)
105+
print(yaml.dump(results, default_flow_style=False))
106+
return
107+
else:
108+
print(Control('dispatcher').cancel(cancel_data))
109+
return
110+
111+
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
112+
dispatcher_setup(get_dispatcherd_config(for_service=True))
113+
run_service()
114+
else:
115+
consumer = None
67116

68-
try:
69-
DispatcherMetricsServer().start()
70-
except redis.exceptions.ConnectionError as exc:
71-
raise CommandError(f'Dispatcher could not connect to redis, error: {exc}')
117+
try:
118+
DispatcherMetricsServer().start()
119+
except redis.exceptions.ConnectionError as exc:
120+
raise CommandError(f'Dispatcher could not connect to redis, error: {exc}')
72121

73-
try:
74-
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
75-
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE)
76-
consumer.run()
77-
except KeyboardInterrupt:
78-
logger.debug('Terminating Task Dispatcher')
79-
if consumer:
80-
consumer.stop()
122+
try:
123+
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
124+
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE)
125+
consumer.run()
126+
except KeyboardInterrupt:
127+
logger.debug('Terminating Task Dispatcher')
128+
if consumer:
129+
consumer.stop()

0 commit comments

Comments
 (0)