From df606f6e42491c555bcb626c91d3bde23ef309f4 Mon Sep 17 00:00:00 2001 From: Bee Date: Tue, 7 Oct 2025 11:06:00 -0700 Subject: [PATCH 1/5] move config file reading+validating to commons this is to facilitate central_hub ability to read the outbound_connections from the basic_bot.yml file --- basic_bot.yml | 19 +++++++- src/basic_bot/bb_start.py | 35 +++++---------- src/basic_bot/bb_stop.py | 30 ++++--------- src/basic_bot/commons/config_file.py | 49 +++++++++++++++++++++ src/basic_bot/commons/config_file_schema.py | 32 ++++++++++++++ 5 files changed, 119 insertions(+), 46 deletions(-) create mode 100644 src/basic_bot/commons/config_file.py diff --git a/basic_bot.yml b/basic_bot.yml index c67680e..81221a4 100644 --- a/basic_bot.yml +++ b/basic_bot.yml @@ -1,3 +1,14 @@ +# Basic Bot development configuration file. +# +# This configuration is intended for development and testing purposes of +# basic_bot and not users. +# +# See src/basic_bot/commons/config_file_schema.py for details +# on the configuration file schema. +# +# See src/basic_bot/created_files/basic_bot.yml for the configuration file +# that is created when you run `bb_create` to create your own bot. + bot_name: "basic_bot" version: "0.1.0" @@ -33,4 +44,10 @@ services: BB_USE_ARECORD: "true" development_env: BB_DISABLE_RECOGNITION_PROVIDER: "true" - BB_LOG_ALL_MESSAGES: "true" \ No newline at end of file + BB_LOG_ALL_MESSAGES: "true" + +outbound_clients: + - name: "example_client" + uri: "wss://someec2instance.compute.amazonaws.com:5001" + identity: "example_client_1" + shared_token: "sdl243097sdlw498glk3wg98yagalkwo8sKLLKJwergwg@@#$!%" diff --git a/src/basic_bot/bb_start.py b/src/basic_bot/bb_start.py index 2a6f361..53f0b20 100644 --- a/src/basic_bot/bb_start.py +++ b/src/basic_bot/bb_start.py @@ -26,9 +26,7 @@ import shlex import subprocess import time -import traceback -import yaml -from jsonschema import validate, ValidationError +from jsonschema import ValidationError from typing import Optional, Dict, List, Any @@ -36,7 +34,7 @@ from basic_bot.commons.script_helpers.pid_files import is_pid_file_valid from basic_bot.commons.script_helpers.log_files import get_log_time -from basic_bot.commons.config_file_schema import config_file_schema +from basic_bot.commons.config_file import read_config_file from basic_bot.commons import constants as c @@ -138,7 +136,9 @@ def start_service( ) -def start_services(config: dict[str, Any], services_filter: Optional[List[str]] = None) -> None: +def start_services( + config: dict[str, Any], services_filter: Optional[List[str]] = None +) -> None: env_vars = config.get("env", {}) env_vars.update(config.get(f"{c.BB_ENV}_env", {})) @@ -159,26 +159,13 @@ def start_services(config: dict[str, Any], services_filter: Optional[List[str]] def main() -> None: args = arg_parser.parse_args() + config = read_config_file(args.file) + validate_unique_names(config) + services_filter = None + if args.services: + services_filter = args.services.split(",") - try: - with open(args.file, "r") as f: - config = yaml.safe_load(f) - validate(config, config_file_schema) - validate_unique_names(config) - - services_filter = None - if args.services: - services_filter = args.services.split(",") - - start_services(config, services_filter) - - except FileNotFoundError as e: - print(f"Error: File not found. {e}") - traceback.print_exc() - except yaml.YAMLError: - print(f"Error: Invalid YAML syntax: {args.file}") - except ValidationError as e: - print(f"Config file validation error: {e}") + start_services(config, services_filter) if __name__ == "__main__": diff --git a/src/basic_bot/bb_stop.py b/src/basic_bot/bb_stop.py index 9d67bfa..880ee9f 100755 --- a/src/basic_bot/bb_stop.py +++ b/src/basic_bot/bb_stop.py @@ -15,15 +15,13 @@ import argparse import os import signal -import yaml -from jsonschema import validate, ValidationError from typing import Optional, List, Any from basic_bot.commons.script_helpers.pid_files import is_pid_file_valid from basic_bot.commons.script_helpers.log_files import get_log_time -from basic_bot.commons.config_file_schema import config_file_schema +from basic_bot.commons.config_file import read_config_file arg_parser = argparse.ArgumentParser(prog="bb_stop", description=__doc__) @@ -79,7 +77,9 @@ def stop_service( os.remove(pid_file) -def stop_services(config: dict[str, Any], services_filter: Optional[List[str]] = None) -> None: +def stop_services( + config: dict[str, Any], services_filter: Optional[List[str]] = None +) -> None: for service in config["services"]: if services_filter and service["name"] not in services_filter: continue @@ -93,24 +93,12 @@ def stop_services(config: dict[str, Any], services_filter: Optional[List[str]] = def main() -> None: args = arg_parser.parse_args() + config = read_config_file(args.file) + services_filter = None + if args.services: + services_filter = args.services.split(",") - try: - with open(args.file, "r") as f: - config = yaml.safe_load(f) - validate(config, config_file_schema) - - services_filter = None - if args.services: - services_filter = args.services.split(",") - - stop_services(config, services_filter) - - except FileNotFoundError: - print(f"Error: File not found: {args.file}") - except yaml.YAMLError: - print(f"Error: Invalid YAML syntax: {args.file}") - except ValidationError as e: - print(f"Config file validation error: {e.message}") + stop_services(config, services_filter) if __name__ == "__main__": diff --git a/src/basic_bot/commons/config_file.py b/src/basic_bot/commons/config_file.py new file mode 100644 index 0000000..bbdb8d4 --- /dev/null +++ b/src/basic_bot/commons/config_file.py @@ -0,0 +1,49 @@ +""" +Utility function for reading and validating the basic_bot.yml configuration file. +""" + +import os +import yaml +from jsonschema import validate, ValidationError +from basic_bot.commons.config_file_schema import config_file_schema +from basic_bot.commons import log + + +def read_config_file(file_path: str) -> dict: + """ + Reads and validates the configuration file at the given path. + + Args: + file_path (str): Path to the configuration file. + + Returns: + dict: The configuration data. + + Raises: + FileNotFoundError: If the configuration file does not exist. + yaml.YAMLError: If there is an error parsing the YAML file. + jsonschema.ValidationError: If the configuration does not conform to the schema. + """ + try: + if not os.path.exists(file_path): + raise FileNotFoundError(f"Configuration file {file_path} does not exist.") + + with open(file_path, "r") as file: + try: + config = yaml.safe_load(file) + except yaml.YAMLError as e: + raise yaml.YAMLError(f"Error parsing YAML file: {e}") + + validate(instance=config, schema=config_file_schema) + + except FileNotFoundError as e: + log.error(f"Error: {file_path} config file not found. {e}") + raise + except yaml.YAMLError as e: + print(f"Error: Invalid YAML syntax in {file_path}: {e}") + raise + except ValidationError as e: + print(f"Config file validation error: {e}") + raise + + return config diff --git a/src/basic_bot/commons/config_file_schema.py b/src/basic_bot/commons/config_file_schema.py index bddb72c..3a5ab06 100644 --- a/src/basic_bot/commons/config_file_schema.py +++ b/src/basic_bot/commons/config_file_schema.py @@ -90,5 +90,37 @@ }, }, }, + # + # Most central_hub clients connect to the websocket served by central_hub. + # For some applications, such as connecting to and controlling robots behind + # a firewall / restrictive router, it may be neccessary to have the bot + # connect outbound to a public facing host. + "outbound_clients": { + "type": "array", + "minItems": 0, + "items": { + "type": "object", + "required": ["name", "uri", "identity"], + "properties": { + # + # Must have unique name for each host. + "name": {"type": "string"}, + # + # This is the websocket uri of the host to connect to. + # Example: "wss://mypublichost.com:5001" + "uri": {"type": "string"}, + # + # This is the name that central_hub expect the bot to use + # when it sends the identity after first connecting. + # It must match the identity configured in basic_bot.yml + "identity": {"type": "string"}, + # + # This is a shared secret token that central_hub expects + # the bot to use when it sends the identity message. This + # would be configureed in the secrets of the public host. + "shared_token": {"type": "string"}, + }, + }, + }, }, } From c968d22d444bc3b552885bbe32defa5fdc951705 Mon Sep 17 00:00:00 2001 From: Bee Date: Wed, 8 Oct 2025 08:34:39 -0700 Subject: [PATCH 2/5] - New outbound clients feature: - Created outbound_clients.py class to manage websocket connections that central_hub initiates (vs accepting inbound) - Extended config schema to support outbound_clients array with name, uri, identity, and shared_token_file fields - Changed from inline shared_token to shared_token_file (pointing to file) for better security - Added tokens/ directory to .gitignore for storing authentication tokens - Added example outbound client configuration in basic_bot.yml - Includes auto-reconnection logic (5 second retry on failure) --- .gitignore | 1 + basic_bot.yml | 2 +- src/basic_bot/commons/config_file_schema.py | 12 ++++++++---- src/basic_bot/services/central_hub.py | 2 -- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index f907211..cf0d55c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ logs/ pids/ recorded_video/ +tokens/ node_modules/ package-lock.json diff --git a/basic_bot.yml b/basic_bot.yml index 81221a4..12cfd94 100644 --- a/basic_bot.yml +++ b/basic_bot.yml @@ -50,4 +50,4 @@ outbound_clients: - name: "example_client" uri: "wss://someec2instance.compute.amazonaws.com:5001" identity: "example_client_1" - shared_token: "sdl243097sdlw498glk3wg98yagalkwo8sKLLKJwergwg@@#$!%" + shared_token_file: "./tokens/example_client_1.txt" diff --git a/src/basic_bot/commons/config_file_schema.py b/src/basic_bot/commons/config_file_schema.py index 3a5ab06..0c8ec6b 100644 --- a/src/basic_bot/commons/config_file_schema.py +++ b/src/basic_bot/commons/config_file_schema.py @@ -115,10 +115,14 @@ # It must match the identity configured in basic_bot.yml "identity": {"type": "string"}, # - # This is a shared secret token that central_hub expects - # the bot to use when it sends the identity message. This - # would be configureed in the secrets of the public host. - "shared_token": {"type": "string"}, + # This is an optional file path pointing to a file that + # contains a shared secret token that central_hub sends + # to the outbound connection when it first connects. + # + # This can be used to authenticate the bot by the public + # facing host. The token must match the token configured + # via secrets on the public host. + "shared_token_file": {"type": "string"}, }, }, }, diff --git a/src/basic_bot/services/central_hub.py b/src/basic_bot/services/central_hub.py index d2dc465..42abfa2 100644 --- a/src/basic_bot/services/central_hub.py +++ b/src/basic_bot/services/central_hub.py @@ -384,8 +384,6 @@ async def main() -> None: log.info(f"Starting server on port {constants.BB_HUB_PORT}") # TODO : figure out why the type error below async with websockets.serve(handle_message, port=constants.BB_HUB_PORT): # type: ignore - # log.info("Starting hub stats task") - # await send_hub_stats_task() await asyncio.Future() # run forever From ca93cd03c63ba327d469a37bf7225edcddb9572e Mon Sep 17 00:00:00 2001 From: Bee Date: Wed, 8 Oct 2025 09:28:50 -0700 Subject: [PATCH 3/5] Complete outbound client implementation with bidirectional hub communication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Implemented OutboundClients class with connection management: - Auto-reconnection on disconnect (5 second retry) - Send identity with optional shared token authentication - Forward all message types to central_hub for processing - Broadcast state updates to all connected outbound clients - Refactored central_hub message handling: - Split handle_message into handle_message (process single message) and handle_connect (connection lifecycle) - Extracted message processing logic to be reusable for both inbound and outbound connections - Added outbound client support to send_state_update_to_subscribers for bidirectional sync - Enables full bidirectional communication: - Local state updates forwarded to remote servers - Remote state updates forwarded to local hub - Both inbound and outbound connections use same message processing logic 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/basic_bot/commons/outbound_clients.py | 128 ++++++++++++++++++++++ src/basic_bot/services/central_hub.py | 116 +++++++++++++------- 2 files changed, 206 insertions(+), 38 deletions(-) create mode 100644 src/basic_bot/commons/outbound_clients.py diff --git a/src/basic_bot/commons/outbound_clients.py b/src/basic_bot/commons/outbound_clients.py new file mode 100644 index 0000000..0795b79 --- /dev/null +++ b/src/basic_bot/commons/outbound_clients.py @@ -0,0 +1,128 @@ +""" +This module is used by central_hub to manage connections to external hub clients +that cannot directly inbound connect to the central_hub websocket. +""" + +import asyncio +import websockets.client +from websockets.client import WebSocketClientProtocol + +from basic_bot.commons import log +from basic_bot.commons.config_file import read_config_file + +from typing import Optional, Callable, Any, Dict, Union + + +class OutboundClients: + """ + This class manages outbound websocket connections to external hub clients + that cannot directly inbound connect to the central_hub websocket. + + It features automatic reconnection on connection loss. + """ + + def __init__( + self, + on_message_received: Optional[Callable[[Any, Union[str, bytes]], Any]] = None, + ) -> None: + config = read_config_file("./basic_bot.yml") + self.outbound_clients = config.get("outbound_clients", []) + self.connections: Dict[str, WebSocketClientProtocol] = {} + self.on_message_received = on_message_received + + async def connect_all(self) -> None: + """ + Connects to all outbound clients specified in the configuration. + """ + for client in self.outbound_clients: + name = client["name"] + uri = client["uri"] + identity = client["identity"] + shared_token_file = client.get("shared_token_file") + + token = None + if shared_token_file: + try: + with open(shared_token_file, "r") as f: + token = f.read().strip() + except Exception as e: + log.error(f"Failed to read token from {shared_token_file}: {e}") + + asyncio.create_task(self._connect_and_listen(name, uri, identity, token)) + + async def _connect_and_listen( + self, name: str, uri: str, identity: str, token: Optional[str] + ) -> None: + """ + Connects to a single outbound client and listens for messages. + + Args: + name (str): The unique name of the outbound client. + uri (str): The websocket URI of the outbound client. + identity (str): The identity string to send upon connection. + token (Optional[str]): An optional shared secret token for authentication. + """ + while True: + try: + log.info(f"Connecting to outbound client {name} at {uri}") + async with websockets.client.connect(uri) as websocket: # type: ignore + self.connections[name] = websocket + await self._send_identity(websocket, identity, token) + await self._listen(websocket, name) + except Exception as e: + log.error(f"Connection to {name} failed: {e}") + log.info(f"Reconnecting to {name} in 5 seconds...") + await asyncio.sleep(5) + + async def _send_identity( + self, websocket, identity: str, token: Optional[str] + ) -> None: + """ + Sends an identity message to the remote websocket server. + + Args: + websocket: The websocket connection. + identity (str): The identity string to send. + token (Optional[str]): An optional shared secret token for authentication. + """ + import json + + identity_data = {"subsystem_name": identity} + if token: + identity_data["shared_token"] = token + + message = json.dumps({"type": "identity", "data": identity_data}) + await websocket.send(message) + log.info(f"Sent identity: {identity}") + + async def _listen(self, websocket, name: str) -> None: + """ + Listens for messages from the remote websocket server and forwards + all messages to the callback. + + Args: + websocket: The websocket connection. + name (str): The name of the outbound client. + """ + try: + async for message in websocket: + log.debug(f"Received message from outbound client {name}") + # Forward all messages to the callback for processing + if self.on_message_received: + await self.on_message_received(websocket, message) + except Exception as e: + log.error(f"Error in _listen for {name}: {e}") + raise + + async def broadcast(self, message: str) -> None: + """ + Broadcasts a message to all connected outbound clients. + + Args: + message (str): The JSON message to send. + """ + for name, websocket in self.connections.items(): + try: + await websocket.send(message) + except Exception as e: + log.error(f"Failed to broadcast to {name}: {e}") diff --git a/src/basic_bot/services/central_hub.py b/src/basic_bot/services/central_hub.py index 42abfa2..25fad66 100644 --- a/src/basic_bot/services/central_hub.py +++ b/src/basic_bot/services/central_hub.py @@ -103,12 +103,13 @@ import asyncio import websockets import traceback -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from websockets.server import WebSocketServerProtocol from basic_bot.commons import constants, log from basic_bot.commons.hub_state import HubState +from basic_bot.commons.outbound_clients import OutboundClients log.info("Initializing hub state") @@ -121,6 +122,9 @@ }, ) +# Will be initialized in main() if outbound clients are configured +outbound_clients: Optional[OutboundClients] = None + # these are all of the client sockets that are connected to the hub connected_sockets: set[WebSocketServerProtocol] = set() @@ -180,7 +184,6 @@ async def send_state_update_to_subscribers(message_data: Dict[str, Any]) -> None log.info( f"send_state_update_to_subscribers: no subscribers for {message_data.keys()}" ) - return relay_message = json.dumps( { @@ -192,6 +195,8 @@ async def send_state_update_to_subscribers(message_data: Dict[str, Any]) -> None "data": message_data, } ) + + # Send to local subscribers sockets_to_close: set[WebSocketServerProtocol] = set() for socket in subscribed_sockets: try: @@ -214,6 +219,10 @@ async def send_state_update_to_subscribers(message_data: Dict[str, Any]) -> None await unregister(socket) await socket.close() + # Also forward to outbound clients if configured + if outbound_clients: + await outbound_clients.broadcast(relay_message) + async def notify_state( websocket: WebSocketServerProtocol, @@ -330,48 +339,65 @@ async def handle_ping(websocket: WebSocketServerProtocol) -> None: await send_message(websocket, json.dumps({"type": "pong"})) -async def handle_message(websocket: WebSocketServerProtocol) -> None: +async def handle_message( + websocket: WebSocketServerProtocol, message: Union[str, bytes] +) -> None: + """ + Process a single message from either inbound or outbound websocket connection. + + Args: + websocket: The websocket connection that sent the message. + message: The raw message (string or bytes) to process. + """ + try: + jsonData = json.loads(message) + messageType = jsonData.get("type") + messageData = jsonData.get("data") + except: + log.error(f"error parsing message: {str(message)}") + return + + if constants.BB_LOG_ALL_MESSAGES and messageType != "ping": + log.info(f"received {str(message)} from {websocket.remote_address[1]}") + + # {type: "getState, data: [state_keys] or omitted} + if messageType == "getState": + await handle_state_request(websocket, messageData) + # {type: "updateState" data: { new state }} + elif messageType == "updateState": + await handle_state_update(messageData) + # {type: "subscribeState", data: [state_keys] or "*" + elif messageType == "subscribeState": + await handle_state_subscribe(websocket, messageData) + # {type: "unsubscribeState", data: [state_keys] or "*" + elif messageType == "unsubscribeState": + await handle_state_unsubscribe(websocket, messageData) + # {type: "identity", data: "subsystem_name"} + elif messageType == "identity": + await handle_identity(websocket, messageData) + elif messageType == "ping": + await handle_ping(websocket) + else: + log.error(f"received unsupported message: {messageType}") + + if constants.BB_LOG_ALL_MESSAGES and messageType != "ping": + log.info(f"getting next message for {websocket.remote_address[1]}") + + +async def handle_connect(websocket: WebSocketServerProtocol) -> None: + """ + Handle an inbound websocket connection. + Registers the connection, processes all messages, and unregisters on disconnect. + """ await register(websocket) try: async for message in websocket: - try: - jsonData = json.loads(message) - messageType = jsonData.get("type") - messageData = jsonData.get("data") - except: - log.error(f"error parsing message: {str(message)}") - continue - - if constants.BB_LOG_ALL_MESSAGES and messageType != "ping": - log.info(f"received {str(message)} from {websocket.remote_address[1]}") - - # {type: "getState, data: [state_keys] or omitted} - if messageType == "getState": - await handle_state_request(websocket, messageData) - # {type: "updateState" data: { new state }} - elif messageType == "updateState": - await handle_state_update(messageData) - # {type: "subscribeState", data: [state_keys] or "*" - elif messageType == "subscribeState": - await handle_state_subscribe(websocket, messageData) - # {type: "unsubscribeState", data: [state_keys] or "*" - elif messageType == "unsubscribeState": - await handle_state_unsubscribe(websocket, messageData) - # {type: "identity", data: "subsystem_name"} - elif messageType == "identity": - await handle_identity(websocket, messageData) - elif messageType == "ping": - await handle_ping(websocket) - else: - log.error(f"received unsupported message: {messageType}") - - if constants.BB_LOG_ALL_MESSAGES and messageType != "ping": - log.info(f"getting next message for {websocket.remote_address[1]}") + await handle_message(websocket, message) except Exception as e: # don't log the exception if it's just a disconnect "no close frame" if "no close frame received" not in str(e): - log.error(f"handle_message from {websocket.remote_address[1]}: {e}") + log.error(f"handle_connect from {websocket.remote_address[1]}: {e}") traceback.print_exc() raise e @@ -381,9 +407,23 @@ async def handle_message(websocket: WebSocketServerProtocol) -> None: async def main() -> None: + global outbound_clients + log.info(f"Starting server on port {constants.BB_HUB_PORT}") + + # Initialize and start outbound client connections if configured + outbound_clients = OutboundClients(on_message_received=handle_message) + if outbound_clients.outbound_clients: + log.info( + f"Starting {len(outbound_clients.outbound_clients)} outbound client(s)" + ) + await outbound_clients.connect_all() + else: + log.info("No outbound clients configured") + outbound_clients = None + # TODO : figure out why the type error below - async with websockets.serve(handle_message, port=constants.BB_HUB_PORT): # type: ignore + async with websockets.serve(handle_connect, port=constants.BB_HUB_PORT): # type: ignore await asyncio.Future() # run forever From 6b37e333b5ec0b18512b3e2f1d51998d5ee3dd63 Mon Sep 17 00:00:00 2001 From: Bee Date: Wed, 8 Oct 2025 15:39:25 -0700 Subject: [PATCH 4/5] Implement outbound websocket client connections for central_hub MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Completes OutboundClients class implementation with auto-reconnection - Adds BB_CONFIG_FILE constant to centralize config path - Removes unnecessary iseeu message handler - Renames MockRemoteHub to MockOutboundClient for clarity - Adds comprehensive integration tests for bidirectional hub communication - Tests verify: connection/identity, local-to-remote, remote-to-local, and bidirectional state sync 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/basic_bot/commons/constants.py | 6 + src/basic_bot/commons/outbound_clients.py | 4 +- .../test_helpers/mock_outbound_client.py | 212 ++++++++++++++++++ .../test_central_hub_outbound.py | 200 +++++++++++++++++ 4 files changed, 420 insertions(+), 2 deletions(-) create mode 100644 src/basic_bot/test_helpers/mock_outbound_client.py create mode 100644 tests/integration_tests/test_central_hub_outbound.py diff --git a/src/basic_bot/commons/constants.py b/src/basic_bot/commons/constants.py index 6d4de3a..1d3e13d 100644 --- a/src/basic_bot/commons/constants.py +++ b/src/basic_bot/commons/constants.py @@ -38,6 +38,12 @@ are also written to the log file when BB_ENV is "development" or "test". """ +BB_CONFIG_FILE = env.env_string("BB_CONFIG_FILE", "./basic_bot.yml") +""" +Path to the basic_bot configuration file. This can be overridden to use +a different configuration file for testing or alternative deployments. +""" + BB_HUB_HOST = env.env_string("BB_HUB_HOST", "127.0.0.1") """ diff --git a/src/basic_bot/commons/outbound_clients.py b/src/basic_bot/commons/outbound_clients.py index 0795b79..3e324db 100644 --- a/src/basic_bot/commons/outbound_clients.py +++ b/src/basic_bot/commons/outbound_clients.py @@ -7,7 +7,7 @@ import websockets.client from websockets.client import WebSocketClientProtocol -from basic_bot.commons import log +from basic_bot.commons import log, constants as c from basic_bot.commons.config_file import read_config_file from typing import Optional, Callable, Any, Dict, Union @@ -25,7 +25,7 @@ def __init__( self, on_message_received: Optional[Callable[[Any, Union[str, bytes]], Any]] = None, ) -> None: - config = read_config_file("./basic_bot.yml") + config = read_config_file(c.BB_CONFIG_FILE) self.outbound_clients = config.get("outbound_clients", []) self.connections: Dict[str, WebSocketClientProtocol] = {} self.on_message_received = on_message_received diff --git a/src/basic_bot/test_helpers/mock_outbound_client.py b/src/basic_bot/test_helpers/mock_outbound_client.py new file mode 100644 index 0000000..db69622 --- /dev/null +++ b/src/basic_bot/test_helpers/mock_outbound_client.py @@ -0,0 +1,212 @@ +""" +Mock outbound client endpoint for testing outbound connections. + +This module provides a mock websocket server that simulates a remote endpoint +for testing the outbound client functionality. +""" + +import asyncio +import json +import threading +import time +from typing import Optional, List, Dict, Any +from websockets.server import WebSocketServerProtocol +import websockets + + +class MockOutboundClient: + """ + A mock websocket server that simulates a remote outbound client endpoint. + + Used for testing outbound client connections and bidirectional communication. + """ + + def __init__(self, port: int = 5200): + self.port = port + self.server = None + self.server_task: Optional[asyncio.Task] = None + self.thread: Optional[threading.Thread] = None + self.loop: Optional[asyncio.AbstractEventLoop] = None + self.connected_clients: List[WebSocketServerProtocol] = [] + self.received_messages: List[Dict[str, Any]] = [] + self.identity_received: Optional[str] = None + self.running = False + self.connection_event = threading.Event() + + async def _handle_client(self, websocket: WebSocketServerProtocol): + """Handle incoming websocket connection from central_hub's outbound client.""" + print(f"MockOutboundClient: Client connected from {websocket.remote_address}") + self.connected_clients.append(websocket) + self.connection_event.set() + + try: + async for message in websocket: + try: + data = json.loads(message) + print(f"MockOutboundClient received: {data}") + self.received_messages.append(data) + + # Handle identity message + if data.get("type") == "identity": + identity_data = data.get("data") + if isinstance(identity_data, dict): + self.identity_received = identity_data.get("subsystem_name") + else: + self.identity_received = identity_data + print( + f"MockOutboundClient: Received identity: {self.identity_received}" + ) + + # Handle ping + elif data.get("type") == "ping": + await websocket.send(json.dumps({"type": "pong"})) + + except json.JSONDecodeError as e: + print(f"MockOutboundClient: Failed to parse message: {e}") + + except Exception as e: + print(f"MockOutboundClient: Connection error: {e}") + finally: + if websocket in self.connected_clients: + self.connected_clients.remove(websocket) + print("MockOutboundClient: Client disconnected") + + async def _run_server(self): + """Run the websocket server.""" + print(f"MockOutboundClient: Starting server on port {self.port}") + async with websockets.serve( + self._handle_client, "localhost", self.port + ) as server: + self.server = server + self.running = True + # Wait until stopped + while self.running: + await asyncio.sleep(0.1) + + def _run_in_thread(self): + """Run the server in a separate thread with its own event loop.""" + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + try: + self.loop.run_until_complete(self._run_server()) + except Exception as e: + if "Event loop stopped" not in str(e): + print(f"MockOutboundClient: Server error: {e}") + finally: + # Clean up pending tasks + pending = asyncio.all_tasks(self.loop) + for task in pending: + task.cancel() + self.loop.run_until_complete( + asyncio.gather(*pending, return_exceptions=True) + ) + self.loop.close() + + def start(self): + """Start the mock outbound client server in a background thread.""" + self.thread = threading.Thread(target=self._run_in_thread, daemon=True) + self.thread.start() + # Give server time to start + time.sleep(0.5) + + def stop(self): + """Stop the mock outbound client server.""" + print("MockOutboundClient: Stopping server") + self.running = False + if self.thread: + self.thread.join(timeout=3) + + def wait_for_connection(self, timeout: float = 5.0) -> bool: + """ + Wait for an outbound client to connect. + + Args: + timeout: Maximum time to wait in seconds. + + Returns: + True if connection established, False if timeout. + """ + return self.connection_event.wait(timeout) + + def send_state_update(self, state_data: Dict[str, Any]): + """ + Send a state update to all connected clients. + + Args: + state_data: Dictionary of state key/value pairs to send. + """ + if not self.connected_clients: + print("MockOutboundClient: No clients connected, cannot send state update") + return + + message = {"type": "updateState", "data": state_data} + message_str = json.dumps(message) + + async def _send(): + for client in self.connected_clients: + try: + await client.send(message_str) + print(f"MockOutboundClient: Sent state update: {state_data}") + except Exception as e: + print(f"MockOutboundClient: Failed to send to client: {e}") + + if self.loop: + future = asyncio.run_coroutine_threadsafe(_send(), self.loop) + # Wait for the send to complete + try: + future.result(timeout=2) + except Exception as e: + print(f"MockOutboundClient: Error waiting for send: {e}") + + def get_received_messages(self) -> List[Dict[str, Any]]: + """Get all messages received from connected clients.""" + return self.received_messages.copy() + + def clear_received_messages(self): + """Clear the received messages list.""" + self.received_messages.clear() + + def has_received_identity(self, expected_name: Optional[str] = None) -> bool: + """ + Check if identity message was received. + + Args: + expected_name: Optional specific subsystem name to check for. + + Returns: + True if identity received (and matches expected_name if provided). + """ + if expected_name: + return self.identity_received == expected_name + return self.identity_received is not None + + def has_received_state_update(self, key: str, value: Any) -> bool: + """ + Check if a specific state update was received. + + Args: + key: The state key to check for. + value: The expected value. + + Returns: + True if matching state update found. + """ + for message in self.received_messages: + if message.get("type") == "stateUpdate": + data = message.get("data", {}) + if key in data and data[key] == value: + return True + return False + + def get_state_updates(self) -> List[Dict[str, Any]]: + """ + Get all state update messages received. + + Returns: + List of state update data dictionaries. + """ + return [ + msg.get("data", {}) + for msg in self.received_messages + if msg.get("type") == "stateUpdate" + ] diff --git a/tests/integration_tests/test_central_hub_outbound.py b/tests/integration_tests/test_central_hub_outbound.py new file mode 100644 index 0000000..2c75ed7 --- /dev/null +++ b/tests/integration_tests/test_central_hub_outbound.py @@ -0,0 +1,200 @@ +""" +Integration tests for central_hub outbound client connections. + +These tests verify bidirectional hub-to-hub communication through outbound +websocket connections. +""" + +import time +import yaml +import basic_bot.test_helpers.central_hub as hub +import basic_bot.test_helpers.start_stop as sst +from basic_bot.test_helpers.mock_outbound_client import MockOutboundClient + +# semi-random values to use for testing +TEST_ANGLES_1 = [10, 50, 180, 120, 90, 0] +TEST_ANGLES_2 = [15, 55, 175, 115, 95, 5] +TEST_ANGLES_3 = [20, 60, 170, 110, 100, 10] + + +def setup_module(): + """Setup central_hub service with test config.""" + # Create test config with outbound clients + with open("basic_bot.yml", "r") as f: + original_config = f.read() + config = yaml.safe_load(original_config) + + # Add outbound client configuration + config["outbound_clients"] = [ + { + "name": "test_remote_hub", + "uri": "ws://localhost:5200", + "identity": "test_outbound_client", + } + ] + + # Write test config + test_config_path = "basic_bot_test_outbound.yml" + with open(test_config_path, "w") as f: + yaml.dump(config, f) + + # Start central_hub with test config + sst.start_service( + "central_hub", + "python -m basic_bot.services.central_hub", + {"BB_CONFIG_FILE": test_config_path}, + ) + + +def teardown_module(): + """Stop central_hub service and cleanup test config.""" + import os + + sst.stop_service("central_hub") + + # Cleanup test config + test_config_path = "basic_bot_test_outbound.yml" + if os.path.exists(test_config_path): + os.remove(test_config_path) + + +class TestOutboundClients: + """Test outbound client connections for bidirectional hub communication.""" + + def test_outbound_connection_and_identity(self): + """Test that outbound client connects and sends identity.""" + # Start mock outbound client endpoint + mock_client = MockOutboundClient(port=5200) + mock_client.start() + + try: + # Wait for connection (central_hub retries every 5 seconds) + assert mock_client.wait_for_connection(timeout=10.0), "Outbound client did not connect" + + # Give a moment for identity exchange to complete + time.sleep(0.5) + + # Check that identity was received + assert mock_client.has_received_identity( + "test_outbound_client" + ), "Identity message not received or incorrect" + + finally: + mock_client.stop() + + def test_local_to_remote_state_propagation(self): + """Test that local state updates are forwarded to remote hub.""" + mock_client = MockOutboundClient(port=5200) + mock_client.start() + + try: + # Give central_hub time to connect + time.sleep(1) + + # Wait for connection + assert mock_client.wait_for_connection(timeout=5.0) + + # Clear any initial messages + mock_client.clear_received_messages() + + # Connect local client and send state update + ws = hub.connect("test_local_client") + hub.send_update_state(ws, {"set_angles": TEST_ANGLES_1}) + + # Wait a bit for propagation + time.sleep(0.5) + + # Check that remote hub received the state update + assert mock_client.has_received_state_update( + "set_angles", TEST_ANGLES_1 + ), "State update not forwarded to remote hub" + + ws.close() + + finally: + mock_client.stop() + + def test_remote_to_local_state_propagation(self): + """Test that remote state updates are forwarded to local clients.""" + mock_client = MockOutboundClient(port=5200) + mock_client.start() + + try: + # Give central_hub time to connect + time.sleep(1) + + # Wait for connection + assert mock_client.wait_for_connection(timeout=5.0) + + # Connect local client and subscribe + ws = hub.connect("test_local_client") + hub.send_subscribe(ws, ["current_angles"]) + + # Give subscription time to register + time.sleep(1.0) + + # Remote hub sends state update + mock_client.send_state_update({"current_angles": TEST_ANGLES_2}) + + # Give more time for propagation + time.sleep(1.0) + + # Local client should receive the update + assert hub.has_received_state_update( + ws, "current_angles", TEST_ANGLES_2 + ), "State update from remote hub not received by local client" + + ws.close() + + finally: + mock_client.stop() + + def test_bidirectional_state_sync(self): + """Test bidirectional state synchronization between local and remote.""" + mock_client = MockOutboundClient(port=5200) + mock_client.start() + + try: + # Give central_hub time to connect + time.sleep(1) + + # Wait for connection + assert mock_client.wait_for_connection(timeout=5.0) + mock_client.clear_received_messages() + + # Setup two local clients + ws1 = hub.connect("test_local_1") + hub.send_subscribe(ws1, ["set_angles", "current_angles"]) + + ws2 = hub.connect("test_local_2") + hub.send_subscribe(ws2, ["current_angles"]) + + time.sleep(0.5) + + # Local client 1 sends update - should reach local client 2 and remote hub + hub.send_update_state(ws1, {"set_angles": TEST_ANGLES_1}) + time.sleep(0.5) + + assert hub.has_received_state_update(ws1, "set_angles", TEST_ANGLES_1) + assert mock_client.has_received_state_update("set_angles", TEST_ANGLES_1) + + # Remote hub sends update - should reach both local clients + mock_client.send_state_update({"current_angles": TEST_ANGLES_2}) + time.sleep(1.0) + + assert hub.has_received_state_update(ws1, "current_angles", TEST_ANGLES_2) + assert hub.has_received_state_update(ws2, "current_angles", TEST_ANGLES_2) + + # Local client 2 sends update - should reach client 1 and remote hub + mock_client.clear_received_messages() + hub.send_update_state(ws2, {"current_angles": TEST_ANGLES_3}) + time.sleep(0.5) + + assert hub.has_received_state_update(ws1, "current_angles", TEST_ANGLES_3) + assert mock_client.has_received_state_update("current_angles", TEST_ANGLES_3) + + ws1.close() + ws2.close() + + finally: + mock_client.stop() From fd3ac2dded043166e508602514b5fde435988fa8 Mon Sep 17 00:00:00 2001 From: Bee Date: Mon, 13 Oct 2025 09:27:48 -0700 Subject: [PATCH 5/5] add stop() class method to OutboundClients --- src/basic_bot/commons/outbound_clients.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/basic_bot/commons/outbound_clients.py b/src/basic_bot/commons/outbound_clients.py index 3e324db..012df9e 100644 --- a/src/basic_bot/commons/outbound_clients.py +++ b/src/basic_bot/commons/outbound_clients.py @@ -29,6 +29,7 @@ def __init__( self.outbound_clients = config.get("outbound_clients", []) self.connections: Dict[str, WebSocketClientProtocol] = {} self.on_message_received = on_message_received + self.is_stopping = False async def connect_all(self) -> None: """ @@ -50,6 +51,15 @@ async def connect_all(self) -> None: asyncio.create_task(self._connect_and_listen(name, uri, identity, token)) + def stop(self) -> None: + """ + Stops all outbound client connections. + """ + self.is_stopping = True + for name, websocket in self.connections.items(): + asyncio.create_task(websocket.close()) + self.connections.clear() + async def _connect_and_listen( self, name: str, uri: str, identity: str, token: Optional[str] ) -> None: @@ -62,7 +72,7 @@ async def _connect_and_listen( identity (str): The identity string to send upon connection. token (Optional[str]): An optional shared secret token for authentication. """ - while True: + while not self.is_stopping: try: log.info(f"Connecting to outbound client {name} at {uri}") async with websockets.client.connect(uri) as websocket: # type: ignore