diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..a61f6fb --- /dev/null +++ b/.flake8 @@ -0,0 +1,4 @@ +[flake8] +max-line-length = 110 +extend-ignore = E203 +exclude = .venv, .git, __pycache__, build, dist diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b6e5000..54d47bb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,6 +1,10 @@ name: CI -on: [push, pull_request] +on: + pull_request: + branches: + - dev + - main jobs: build: @@ -13,10 +17,9 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + pip install -r requirements.txt - name: Lint with flake8 run: | - pip install flake8 flake8 src tests - name: Run tests run: | diff --git a/.prettierrc.json b/.prettierrc.json index 1376935..25cb2cb 100644 --- a/.prettierrc.json +++ b/.prettierrc.json @@ -3,7 +3,7 @@ "useTabs": false, "singleQuote": true, "trailingComma": "all", - "printWidth": 120, + "printWidth": 110, "semi": true, "endOfLine": "lf", "overrides": [ diff --git a/.vscode/settings.json b/.vscode/settings.json index 1dc798a..4614b91 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -28,5 +28,6 @@ "python.analysis.typeCheckingMode": "basic", "python.analysis.autoImportCompletions": true, "python.defaultInterpreterPath": ".venv/bin/python", - "python.terminal.activateEnvInCurrentTerminal": true + "python.terminal.activateEnvInCurrentTerminal": true, + "cSpell.words": ["MQTT"] } diff --git a/README_TESTING.md b/README_TESTING.md new file mode 100644 index 0000000..a4ab2ae --- /dev/null +++ b/README_TESTING.md @@ -0,0 +1,248 @@ +# Robot Library Python — Testing Guide + +## Overview + +The robot library includes comprehensive unit and integration tests to validate functionality without requiring a physical robot or external MQTT broker. + +- **Unit Tests** (`test_all.py`): Fast, isolated tests using mocks +- **Integration Tests** (`test_integration.py`): Real MQTT client behavior with in-process broker or live broker + +## Quick Start + +### Run All Tests + +From the repository root: + +```bash +cd src +python -m unittest discover -s robot -p "test_*.py" -v +``` + +Or run specific test suites: + +```bash +# Unit tests only +python -m unittest robot.test_all -v + +# Integration tests only +python -m unittest robot.test_integration -v + +# Both +python -m unittest robot.test_all robot.test_integration -v +``` + +### Expected Output + +``` +test_coordinate_publish_and_setters (robot.test_all.RobotPackageSmokeTests...) ... ok +test_motion_controller_stub_coord_and_moves (robot.test_all.RobotPackageSmokeTests...) ... ok +test_neopixel_and_indicator_publish (robot.test_all.RobotPackageSmokeTests...) ... ok +test_proximity_reading_type (robot.test_all.RobotPackageSmokeTests...) ... ok +test_rgb_color_type_basic (robot.test_all.RobotPackageSmokeTests...) ... ok +test_sensors_handle_subscription (robot.test_all.RobotPackageSmokeTests...) ... ok +test_client_initialization (robot.test_integration.IntegrationTestRobotMqttClient...) ... ok +test_mqtt_msg_queueing (robot.test_integration.IntegrationTestRobotMqttClient...) ... ok +test_multiple_subscribers (robot.test_integration.IntegrationTestRobotMqttClient...) ... ok +test_publish_and_receive (robot.test_integration.IntegrationTestRobotMqttClient...) ... ok +test_client_queue_operations (robot.test_integration.RobotMqttClientMockTests...) ... ok +test_in_queue_append (robot.test_integration.RobotMqttClientMockTests...) ... ok + +Ran 12 tests in 0.024s + +OK +``` + +## Test Structure + +### Unit Tests (`src/robot/test_all.py`) + +Fast smoke tests using lightweight mocks, no external dependencies. + +**Coverage:** +- `RGBColorType`: Color initialization, hex code parsing, string parsing, comparison +- `ProximityReadingType`: Distance/color parsing, list operations +- `Coordinate`: Getters, setters, publish operations +- `MotionController`: Movement calculations, zero-speed handling +- `NeoPixel`: Initialization, color changes, publish +- `Sensors` (Distance, Color, Proximity): Subscription handling, state updates + +**Run:** +```bash +python -m unittest robot.test_all -v +``` + +**Execution Time:** < 0.01s + +### Integration Tests (`src/robot/test_integration.py`) + +Tests for `RobotMqttClient` behavior with real or simulated MQTT broker. + +**Coverage:** +- Client initialization and connection +- Publish/subscribe with in-process broker +- Multi-client message routing +- Message queue operations +- Connection lifecycle + +**Run:** +```bash +python -m unittest robot.test_integration -v +``` + +**Execution Time:** < 0.05s + +## MQTT Broker Setup (Optional) + +### For Development/Integration Testing + +The integration tests work with or without a real MQTT broker: + +- **Without broker**: Uses in-process `InMemoryMQTTBroker` stub (tests skip gracefully) +- **With broker**: Tests connect to real `localhost:1883` + +#### Install Mosquitto (Windows via Chocolatey) + +```powershell +choco install mosquitto +``` + +#### Start Mosquitto + +```bash +mosquitto -p 1883 +``` + +Or use Windows Service: +```powershell +net start mosquitto +``` + +#### Verify Connection + +```bash +python -c "from paho import mqtt; print('MQTT available')" +``` + +## Development Workflow + +### Before Committing + +1. **Run all tests:** + ```bash + cd src + python -m unittest robot.test_all robot.test_integration -v + ``` + +2. **Check for syntax errors (optional):** + ```bash + python -m py_compile robot/**/*.py + ``` + +3. **Run specific test file:** + ```bash + python -m unittest robot.test_all.RobotPackageSmokeTests.test_rgb_color_type_basic -v + ``` + +### Adding New Tests + +1. Add test method to existing test class or create new `TestCase`: + ```python + def test_new_feature(self): + # Arrange + obj = MyClass() + + # Act + result = obj.do_something() + + # Assert + self.assertEqual(result, expected_value) + ``` + +2. Run tests to verify: + ```bash + python -m unittest robot.test_all -v + ``` + +## Dependencies + +### Required +- Python 3.10+ +- (No external packages for unit tests) + +### Optional +- `paho-mqtt`: For real MQTT broker integration (auto-stubbed if missing) +- Mosquitto: Real MQTT broker for development (optional) + +## Troubleshooting + +### Import Errors in IDE + +If VS Code shows `Import "robot.x.y" could not be resolved`: + +1. Ensure `src/` is the Python source root +2. VS Code settings (`.vscode/settings.json`): + ```json + { + "python.analysis.extraPaths": ["${workspaceFolder}/src"] + } + ``` + +### Tests Won't Run + +**Error: `No module named robot`** +```bash +cd src +python -m unittest robot.test_all -v # Must run from src/ +``` + +**Error: `ModuleNotFoundError: No module named 'paho'`** +- This is expected and graceful. Tests skip MQTT connection tests. +- To fix: `pip install paho-mqtt` + +### MQTT Connection Refused + +If tests show `MQTT connection error`: +1. Ensure Mosquitto is running: `mosquitto -p 1883` +2. Check if port 1883 is in use: `netstat -ano | findstr :1883` +3. Tests will skip gracefully if broker unavailable + +## Test Coverage + +| Module | Status | Notes | +|--------|--------|-------| +| `types/` | ✅ Full | RGBColorType, ProximityReadingType | +| `sensors/` | ✅ Full | Distance, Color, Proximity | +| `helpers/` | ✅ Partial | Coordinate, MotionController (logic tested) | +| `indicators/` | ✅ Full | NeoPixel | +| `mqtt/` | ✅ Partial | RobotMqttClient (connection + queue ops) | +| `communication/` | ⚠️ Mock only | Requires full robot context | +| `robot_base/` | ⚠️ Mock only | Requires MQTT server + robot setup | + +## CI/CD Integration + +For GitHub Actions or similar: + +```yaml +- name: Run Tests + run: | + cd src + python -m unittest discover -s robot -p "test_*.py" -v +``` + +## Performance Benchmarks + +| Test Suite | Count | Duration | Notes | +|-----------|-------|----------|-------| +| Unit Tests | 6 | < 0.01s | No I/O, no network | +| Integration Tests | 6 | < 0.05s | In-process broker or real MQTT | +| **Total** | **12** | **< 0.1s** | Fast feedback loop | + +## Further Reading + +- [unittest Documentation](https://docs.python.org/3/library/unittest.html) +- [MQTT Protocol](https://mqtt.org/) +- [Mosquitto Docs](https://mosquitto.org/documentation/) + +--- + +**Last Updated:** January 12, 2026 diff --git a/docs/Makefile b/docs/Makefile index b4f7145..d41f3b5 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -25,5 +25,5 @@ apidoc: html: $(SPHINXBUILD) -b html "$(SOURCEDIR)" "$(BUILDDIR)/html" -livehtml: +livehtml: clean sphinx-autobuild --host 0.0.0.0 --port ${PORT} -c . "$(SOURCEDIR)" "$(BUILDDIR)/html" diff --git a/docs/api/robot.communication.rst b/docs/api/robot.communication.rst index e53e76d..f558f3e 100644 --- a/docs/api/robot.communication.rst +++ b/docs/api/robot.communication.rst @@ -4,6 +4,14 @@ robot.communication package Submodules ---------- +robot.communication.communication module +---------------------------------------- + +.. automodule:: robot.communication.communication + :members: + :show-inheritance: + :undoc-members: + robot.communication.directed\_comm module ----------------------------------------- diff --git a/docs/api/robot.exception.rst b/docs/api/robot.exception.rst new file mode 100644 index 0000000..842e054 --- /dev/null +++ b/docs/api/robot.exception.rst @@ -0,0 +1,10 @@ +robot.exception package +======================= + +Module contents +--------------- + +.. automodule:: robot.exception + :members: + :show-inheritance: + :undoc-members: diff --git a/docs/api/robot.helpers.rst b/docs/api/robot.helpers.rst index b84e488..a51b3a6 100644 --- a/docs/api/robot.helpers.rst +++ b/docs/api/robot.helpers.rst @@ -12,6 +12,14 @@ robot.helpers.coordinate module :show-inheritance: :undoc-members: +robot.helpers.motion\_controller module +--------------------------------------- + +.. automodule:: robot.helpers.motion_controller + :members: + :show-inheritance: + :undoc-members: + robot.helpers.robot\_mqtt module -------------------------------- diff --git a/docs/api/robot.indicators.rst b/docs/api/robot.indicators.rst index 3b831c6..d00b035 100644 --- a/docs/api/robot.indicators.rst +++ b/docs/api/robot.indicators.rst @@ -4,6 +4,14 @@ robot.indicators package Submodules ---------- +robot.indicators.abstract\_indicator module +------------------------------------------- + +.. automodule:: robot.indicators.abstract_indicator + :members: + :show-inheritance: + :undoc-members: + robot.indicators.neopixel module -------------------------------- diff --git a/docs/api/robot.interfaces.rst b/docs/api/robot.interfaces.rst new file mode 100644 index 0000000..2fbf4d4 --- /dev/null +++ b/docs/api/robot.interfaces.rst @@ -0,0 +1,10 @@ +robot.interfaces package +======================== + +Module contents +--------------- + +.. automodule:: robot.interfaces + :members: + :show-inheritance: + :undoc-members: diff --git a/docs/api/robot.mqtt.rst b/docs/api/robot.mqtt.rst new file mode 100644 index 0000000..bf9d252 --- /dev/null +++ b/docs/api/robot.mqtt.rst @@ -0,0 +1,29 @@ +robot.mqtt package +================== + +Submodules +---------- + +robot.mqtt.mqtt\_msg module +--------------------------- + +.. automodule:: robot.mqtt.mqtt_msg + :members: + :show-inheritance: + :undoc-members: + +robot.mqtt.robot\_mqtt\_client module +------------------------------------- + +.. automodule:: robot.mqtt.robot_mqtt_client + :members: + :show-inheritance: + :undoc-members: + +Module contents +--------------- + +.. automodule:: robot.mqtt + :members: + :show-inheritance: + :undoc-members: diff --git a/docs/api/robot.rst b/docs/api/robot.rst index 547e914..28d36a0 100644 --- a/docs/api/robot.rst +++ b/docs/api/robot.rst @@ -8,9 +8,13 @@ Subpackages :maxdepth: 4 robot.communication + robot.exception robot.helpers robot.indicators + robot.interfaces + robot.mqtt robot.sensors + robot.types Submodules ---------- @@ -39,6 +43,14 @@ robot.robot\_base module :show-inheritance: :undoc-members: +robot.virtual\_robot module +--------------------------- + +.. automodule:: robot.virtual_robot + :members: + :show-inheritance: + :undoc-members: + Module contents --------------- diff --git a/docs/api/robot.sensors.rst b/docs/api/robot.sensors.rst index 9cf6702..7ae6661 100644 --- a/docs/api/robot.sensors.rst +++ b/docs/api/robot.sensors.rst @@ -4,6 +4,14 @@ robot.sensors package Submodules ---------- +robot.sensors.abstract\_sensor module +------------------------------------- + +.. automodule:: robot.sensors.abstract_sensor + :members: + :show-inheritance: + :undoc-members: + robot.sensors.color module -------------------------- diff --git a/docs/api/robot.types.rst b/docs/api/robot.types.rst new file mode 100644 index 0000000..45c93dd --- /dev/null +++ b/docs/api/robot.types.rst @@ -0,0 +1,29 @@ +robot.types package +=================== + +Submodules +---------- + +robot.types.proximity\_reading\_type module +------------------------------------------- + +.. automodule:: robot.types.proximity_reading_type + :members: + :show-inheritance: + :undoc-members: + +robot.types.rgb\_color\_type module +----------------------------------- + +.. automodule:: robot.types.rgb_color_type + :members: + :show-inheritance: + :undoc-members: + +Module contents +--------------- + +.. automodule:: robot.types + :members: + :show-inheritance: + :undoc-members: diff --git a/docs/index.md b/docs/index.md index 957b2d8..12845db 100644 --- a/docs/index.md +++ b/docs/index.md @@ -14,5 +14,5 @@ Welcome to the documentation for the Python robot library. This site includes in README installation usage -api/index +api/modules ``` diff --git a/examples/my_test_robot.py b/examples/my_test_robot.py new file mode 100644 index 0000000..93e461e --- /dev/null +++ b/examples/my_test_robot.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import threading +import time + +from robot import MQTTSettings, VirtualRobot +from robot.interfaces import RobotState + + +class MyTestRobot(VirtualRobot): + def setup(self) -> None: # type: ignore[override] + print("My Test Robot Started") + super().setup() + + def loop(self) -> None: # type: ignore[override] + super().loop() + if self.state == RobotState.RUN: + print("Test") + self.delay(1000) + + def communication_interrupt(self, msg: str) -> None: + print( + f"communicationInterrupt on {self.id} " + f"with msg:{msg}" + ) + + +if __name__ == "__main__": + # Configure MQTT (fill with your broker details) + MQTTSettings.server = "localhost" + MQTTSettings.port = 1883 + MQTTSettings.user_name = "" + MQTTSettings.password = "" + MQTTSettings.channel = "v1" + + robot = MyTestRobot(10, 0, 0, 90) + t = threading.Thread(target=robot.run, daemon=True) + t.start() + + # Example to send start after a short delay + time.sleep(1) + robot.start() + t.join() diff --git a/pyproject.toml b/pyproject.toml index 9615291..168bbfe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,6 @@ version = "0.1.0" description = "A modular robot control library" authors = [ { name = "Nuwan Jaliyagoda", email = "nuwanjaliyagoda@gmail.com" }, - { name = "Nuwan Jaliyagoda", email = "kavindumethpura@gmail.com" }, { name = "Kavindu Prabhath Methpura", email = "kavindumethpura@gmail.com" }, ] readme = "README.md" diff --git a/requirements.txt b/requirements.txt index e410884..1c5ba72 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,4 @@ -paho-mqtt==2.1.0 \ No newline at end of file +flake8==7.3.0 +packaging==25.0 +paho-mqtt==2.1.0 +pytest==8.4.2 diff --git a/scripts/manual_mqtt_test.py b/scripts/manual_mqtt_test.py new file mode 100644 index 0000000..0a9bbec --- /dev/null +++ b/scripts/manual_mqtt_test.py @@ -0,0 +1,43 @@ +import time +import paho.mqtt.client as mqtt + +BROKER = "localhost" # must match MQTTSettings.server in your robot script +PORT = 1883 # must match MQTTSettings.port +CHANNEL = "v1" # must match MQTTSettings.channel +ROBOT_ID = 10 # must match the id you gave the robot + +# client = mqtt.Client() +client = mqtt.Client(protocol=mqtt.MQTTv5) + +client.connect(BROKER, PORT) +client.loop_start() + +topic_single = f"{CHANNEL}/robot/msg/{ROBOT_ID}" +topic_broadcast = f"{CHANNEL}/robot/msg/broadcast" + +# helper to publish and show what we sent +def send(topic, payload): + print(f"-> {topic}: {payload}") + client.publish(topic, payload) + +send(topic_single, "START") +time.sleep(1) +send(topic_single, "STOP") +send(topic_single, "RESET") +send(topic_broadcast, "START") # start all robots on this channel + +# ask the robot to identify itself; listen for reply +def on_msg(_c, _u, m): + try: + body = m.payload.decode() + except Exception: + body = str(m.payload) + print(f"<- {m.topic}: {body}") + +client.on_message = on_msg +client.subscribe(f"{CHANNEL}/robot/live") +send(topic_single, "ID?") +time.sleep(2) + +client.loop_stop() +client.disconnect() \ No newline at end of file diff --git a/src/robot/.vscode/extensions.json b/src/robot/.vscode/extensions.json new file mode 100644 index 0000000..ce43dac --- /dev/null +++ b/src/robot/.vscode/extensions.json @@ -0,0 +1,13 @@ +{ + "recommendations": [ + "ms-python.python", + "ms-python.vscode-pylance", + "charliermarsh.ruff", + "ms-python.flake8", + "ms-python.mypy-type-checker", + "esbenp.prettier-vscode", + "streetsidesoftware.code-spell-checker", + "redhat.vscode-yaml", + "tamasfe.even-better-toml" + ] +} \ No newline at end of file diff --git a/src/robot/__init__.py b/src/robot/__init__.py index d27dffa..3861fb5 100644 --- a/src/robot/__init__.py +++ b/src/robot/__init__.py @@ -1,11 +1,17 @@ -"""Top-level package for robot library.""" +"""Top-level package for robot library (Python port of robot-library-java).""" from .robot_base import Robot -from .mqtt_client import RobotMqttClient from .motion import MotionController +from .virtual_robot import VirtualRobot + +# Expose common subpackages +from .configs.mqtt_settings import MQTTSettings +from .configs.robot_settings import RobotSettings __all__ = [ "Robot", - "RobotMqttClient", "MotionController", + "VirtualRobot", + "MQTTSettings", + "RobotSettings", ] diff --git a/src/robot/communication/communication.py b/src/robot/communication/communication.py new file mode 100644 index 0000000..5fbb453 --- /dev/null +++ b/src/robot/communication/communication.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from robot.interfaces import IMqttHandler +from robot.mqtt.robot_mqtt_client import RobotMqttClient + + +class Communication(IMqttHandler): + def __init__(self, robot_id: int, mqtt_client: RobotMqttClient): + self.robot_mqtt_client = mqtt_client + self.robot_id = robot_id + + def send_message(self, msg: str) -> None: # abstract + raise NotImplementedError + + def send_message_with_distance(self, msg: str, distance: int) -> None: # abstract + raise NotImplementedError diff --git a/src/robot/communication/directed_comm.py b/src/robot/communication/directed_comm.py index c0317d8..d773a77 100644 --- a/src/robot/communication/directed_comm.py +++ b/src/robot/communication/directed_comm.py @@ -1,7 +1,35 @@ -"""Directed communication module.""" +"""Directed communication module mirroring Java logic.""" +from __future__ import annotations -class DirectedCommunication: - """Placeholder directed communication class.""" +import json - pass +from robot.communication.communication import Communication +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient + + +class DirectedCommunication(Communication): + def __init__(self, robot_id: int, mqtt_client: RobotMqttClient): + super().__init__(robot_id, mqtt_client) + self._topics_sub: dict[str, str] = {} + self._subscribe("COMMUNICATION_IN_DIR", f"comm/in/direct/{robot_id}") + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def send_message(self, msg: str) -> None: + obj = {"id": self.robot_id, "msg": msg} + self.robot_mqtt_client.publish("comm/out/direct", json.dumps(obj)) + + def send_message_with_distance(self, msg: str, distance: int) -> None: + obj = {"id": self.robot_id, "msg": msg, "dist": distance} + self.robot_mqtt_client.publish("comm/out/direct", json.dumps(obj)) + + def handle_subscription(self, robot, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic == self._topics_sub.get("COMMUNICATION_IN_DIR"): + robot.communication_interrupt(msg) + else: + print(f"Received (unknown dir): {topic}> {msg}") diff --git a/src/robot/communication/simple_comm.py b/src/robot/communication/simple_comm.py index 7ae7394..b1b933c 100644 --- a/src/robot/communication/simple_comm.py +++ b/src/robot/communication/simple_comm.py @@ -1,7 +1,39 @@ -"""Simple communication module.""" +"""Simple communication module mirroring Java logic.""" +from __future__ import annotations -class SimpleCommunication: - """Placeholder simple communication class.""" +import json +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.mqtt.mqtt_msg import MqttMsg +from robot.communication.communication import Communication - pass + +class SimpleCommunication(Communication): + def __init__(self, robot_id: int, mqtt_client: RobotMqttClient): + super().__init__(robot_id, mqtt_client) + self._topics_sub: dict[str, str] = {} + self._subscribe("COMMUNICATION_IN_SIMP", f"comm/in/simple/{robot_id}") + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def send_message(self, msg: str) -> None: + obj = {"id": self.robot_id, "msg": msg} + self.robot_mqtt_client.publish("comm/out/simple", json.dumps(obj)) + + def send_message_with_distance(self, msg: str, distance: int) -> None: + obj = {"id": self.robot_id, "msg": msg, "dist": distance} + self.robot_mqtt_client.publish("comm/out/simple", json.dumps(obj)) + + # IMqttHandler + def handle_subscription(self, robot, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic == self._topics_sub.get("COMMUNICATION_IN_SIMP"): + robot.communication_interrupt(msg) + else: + expected_topic = self._topics_sub.get("COMMUNICATION_IN_SIMP") + print( + f"Received (unknown simp): expected '{expected_topic}', " + f"but got '{topic}' with message '{msg}'" + ) diff --git a/src/robot/configs/mqtt_settings.py b/src/robot/configs/mqtt_settings.py new file mode 100644 index 0000000..7c3e42e --- /dev/null +++ b/src/robot/configs/mqtt_settings.py @@ -0,0 +1,23 @@ +"""MQTT settings analogous to swarm.configs.MQTTSettings (Java). + +Set these values before starting a robot to connect to the broker. +""" + + +class MQTTSettings: + server: str | None = None + port: int = 1883 + user_name: str | None = None + password: str | None = None + channel: str | None = None + + @classmethod + def print(cls) -> None: + print(f"server: {cls.server}") + print(f"port: {cls.port}") + print(f"username: {cls.user_name}") + print(f"password: {'(set)' if cls.password else '(unset)'}") + print(f"channel: {cls.channel}") + + +__all__ = ["MQTTSettings"] diff --git a/src/robot/configs/robot_settings.py b/src/robot/configs/robot_settings.py new file mode 100644 index 0000000..9c662f6 --- /dev/null +++ b/src/robot/configs/robot_settings.py @@ -0,0 +1,16 @@ +"""Robot settings analogous to swarm.configs.RobotSettings (Java).""" + + +class RobotSettings: + ROBOT_SPEED_MAX: int = 255 + ROBOT_SPEED_MIN: int = 50 + + ROBOT_RADIUS: int = 6 # in cm + ROBOT_WIDTH: int = 12 # in cm + ROBOT_WHEEL_RADIUS: float = 3.5 # in cm + + # 0: no logs (TODO: implement levels) + ROBOT_LOG_LEVEL: int = 0 + + +__all__ = ["RobotSettings"] diff --git a/src/robot/exception/__init__.py b/src/robot/exception/__init__.py new file mode 100644 index 0000000..7c915d5 --- /dev/null +++ b/src/robot/exception/__init__.py @@ -0,0 +1,46 @@ +"""Exception types mirroring Java exceptions (slim wrappers).""" + + +class MotionControllerException(Exception): + def __init__(self, message: str): + super().__init__(message) + print(f"Motion Error: {message}") + + +class MqttClientException(Exception): + def __init__(self, message: str): + super().__init__(message) + print(f"MQTT Error: {message}") + + +class SensorException(Exception): + def __init__(self, message: str): + super().__init__(message) + print(f"Sensor Error: {message}") + + +class ProximityException(Exception): + def __init__(self, message: str): + super().__init__(message) + print(f"Proximity reading error: {message}") + + +class RGBColorException(Exception): + def __init__( + self, + R: int | None = None, + G: int | None = None, + B: int | None = None, + ): + msg = f"Invalid RGB values: R={R}, G={G}, B={B}" + super().__init__(msg) + print(msg) + + +__all__ = [ + "MotionControllerException", + "MqttClientException", + "SensorException", + "ProximityException", + "RGBColorException", +] diff --git a/src/robot/helpers/__init__.py b/src/robot/helpers/__init__.py index 76c58a1..7752bbc 100644 --- a/src/robot/helpers/__init__.py +++ b/src/robot/helpers/__init__.py @@ -2,8 +2,10 @@ from .coordinate import Coordinate from .robot_mqtt import RobotMQTT +from .motion_controller import MotionController __all__ = [ "Coordinate", "RobotMQTT", + "MotionController", ] diff --git a/src/robot/helpers/coordinate.py b/src/robot/helpers/coordinate.py index 09e5a61..20a830c 100644 --- a/src/robot/helpers/coordinate.py +++ b/src/robot/helpers/coordinate.py @@ -1,7 +1,105 @@ -"""Coordinate tracking helper.""" +from __future__ import annotations +import math +import json -class Coordinate: - """Placeholder coordinate tracker.""" +from robot.interfaces import IMqttHandler +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient - pass + +class Coordinate(IMqttHandler): + def __init__( + self, + robot_id: int, + x: float, + y: float, + heading: float, + mqtt_client: RobotMqttClient, + ): + self._x = float(x) + self._y = float(y) + self._heading = float(heading) + self._robot_id = robot_id + self.robot_mqtt_client = mqtt_client + + # subscriptions + self._topics_sub: dict[str, str] = {} + self._subscribe("ROBOT_LOCALIZATION", "localization/update/?") + + # Subscriptions ------------------------------------------------------ + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def handle_subscription(self, robot, message: MqttMsg) -> None: # noqa: D401 + topic = message.topic + if topic == self._topics_sub.get("ROBOT_LOCALIZATION"): + self.publish_coordinate() + + # Getters/Setters ---------------------------------------------------- + def get_x(self) -> float: + return self._x + + def get_y(self) -> float: + return self._y + + def set_x(self, x: float) -> None: + self._x = float(x) + + def set_y(self, y: float) -> None: + self._y = float(y) + + def get_heading(self) -> float: + return self._heading + + def get_heading_rad(self) -> float: + + return float(math.radians(self._heading)) + + def set_heading(self, heading: float) -> None: + self._heading = float(self._normalize_heading(heading)) + + def set_heading_rad(self, heading: float) -> None: + + self.set_heading(math.degrees(heading)) + + def set_coordinate(self, x: float, y: float) -> None: + self.set_x(x) + self.set_y(y) + + def set_coordinate_heading(self, x: float, y: float, heading: float) -> None: + self.set_coordinate(x, y) + self.set_heading(heading) + + # Utilities ----------------------------------------------------------- + def __str__(self) -> str: + return ( + f"x:{self._round2(self._x)} y:{self._round2(self._y)} " + f"heading:{self._round2(self._heading)}" + ) + + def print(self) -> None: # noqa: A003 - mirror Java name + print(str(self)) + + def publish_coordinate(self) -> None: + # Keep format identical to Java using a JSON array of one object + coord = { + "id": self._robot_id, + "x": self.get_x(), + "y": self.get_y(), + "heading": self.get_heading(), + "reality": "V", + } + data = [coord] + + self.robot_mqtt_client.publish("localization/update", json.dumps(data)) + + # Internal helpers ---------------------------------------------------- + def _round2(self, v: float) -> float: + return round(v * 100) / 100.0 + + def _normalize_heading(self, heading: float) -> float: + + # normalize to [-180, 180] + return heading - math.ceil(heading / 360.0 - 0.5) * 360.0 diff --git a/src/robot/helpers/motion_controller.py b/src/robot/helpers/motion_controller.py new file mode 100644 index 0000000..dbfba18 --- /dev/null +++ b/src/robot/helpers/motion_controller.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +import math +import time + +from robot.configs.robot_settings import RobotSettings +from robot.exception import MotionControllerException +from robot.helpers.coordinate import Coordinate + + +class MotionController: + # This is the maximum duration allowed to coordinate calculation + _MAX_DURATION = 100 + + CM_2_MM = 10 + SEC_2_MS = 1000 + + # Additional delays for simulation time + ADDITIONAL_DELAY = 500 + + # Match cm/s speed + SPEED_FACTOR = 0.1 + + def __init__(self, coordinate: Coordinate | None = None): + if coordinate is None: + # Lightweight fallback coordinate for tests or standalone use + class _StubCoord: + def __init__(self): + self._x = 0.0 + self._y = 0.0 + self._h = 0.0 + + def get_x(self): + return self._x + + def get_y(self): + return self._y + + def get_heading(self): + return self._h + + def get_heading_rad(self): + return math.radians(self._h) + + def set_coordinate_heading(self, x, y, h): + self._x, self._y, self._h = x, y, h + + def publish_coordinate(self): + pass + + coordinate = _StubCoord() # type: ignore[assignment] + self.c = coordinate # type: ignore[assignment] + + # Wrappers ----------------------------------------------------------- + def move( + self, + left_speed: int, + right_speed: int, + duration: int | float | None = None, + ) -> None: + if duration is None: + duration = self._MAX_DURATION + self._move(left_speed, right_speed, float(duration)) + + def rotate(self, speed: int, duration: int | float | None = None) -> None: + if duration is None: + duration = self._MAX_DURATION + self._rotate(speed, float(duration)) + + def rotate_radians(self, speed: int, radians: float) -> None: + self.rotate_degree(speed, math.degrees(radians)) + + def rotate_degree(self, speed: int, degree: float) -> None: + try: + if degree == 0 or degree < -180 or degree > 180: + raise MotionControllerException( + "Degree should in range [-180, 180], except 0" + ) + if speed < RobotSettings.ROBOT_SPEED_MIN: + raise MotionControllerException( + "Speed should be greater than ROBOT_SPEED_MIN" + ) + + sign = int(degree / abs(degree)) + distance = ( + 2 * math.pi * RobotSettings.ROBOT_RADIUS * (abs(degree) / 360) + ) * self.CM_2_MM + duration = float(distance / abs(speed)) * self.SEC_2_MS + self._debug( + f"Sign: {sign} Distance: {distance} Duration: {duration}" + ) + self._rotate(sign * speed, duration) + + except MotionControllerException as e: + print(f"[WARNING] rotate_degree validation failed: {e}") + return + + # Core movement ------------------------------------------------------ + def _rotate(self, speed: int, duration: float) -> None: + self._move(speed, -1 * speed, duration) + + def _move( + self, + left_speed: int, + right_speed: int, + duration: float, + ) -> None: + if not ( + self._is_speed_in_range(left_speed) + and self._is_speed_in_range(right_speed) + ): + try: + raise MotionControllerException( + "One of the provided speeds is out of " + "range in move() function" + ) + except MotionControllerException: + return + + # step interval in ms, break the duration into slices + step_interval = 100 + cumulative_interval = 0 + steps = int(duration // step_interval) if duration > 0 else 0 + + for _ in range(steps): + dL = left_speed * self.SPEED_FACTOR * (step_interval / 1000.0) + dR = right_speed * self.SPEED_FACTOR * (step_interval / 1000.0) + d = (dL + dR) / 2.0 + h = self.c.get_heading_rad() + + x = self.c.get_x() + d * math.cos(h) + y = self.c.get_y() + d * math.sin(h) + heading = ( + self.c.get_heading_rad() + + (dR - dL) / (RobotSettings.ROBOT_WIDTH) + ) + + self.c.set_coordinate_heading(x, y, math.degrees(heading)) + + cumulative_interval += step_interval + if cumulative_interval >= self.ADDITIONAL_DELAY: + self._debug(f"Adding extra delay of {self.ADDITIONAL_DELAY}") + self._delay(self.ADDITIONAL_DELAY) + self.c.publish_coordinate() + cumulative_interval -= self.ADDITIONAL_DELAY + + # Round to nearest int + self.c.set_coordinate_heading( + round(self.c.get_x()), + round(self.c.get_y()), + round(self.c.get_heading()), + ) + self.c.publish_coordinate() + + # Helpers ------------------------------------------------------------ + def _is_speed_in_range(self, speed: int) -> bool: + if speed > 0: + return ( + RobotSettings.ROBOT_SPEED_MIN <= speed + <= RobotSettings.ROBOT_SPEED_MAX + ) + elif speed < 0: + return ( + -RobotSettings.ROBOT_SPEED_MAX + <= speed + <= -RobotSettings.ROBOT_SPEED_MIN + ) + return True # 0 acceptable + + def _delay(self, duration_ms: int) -> None: + time.sleep(max(0, duration_ms) / 1000.0) + + @staticmethod + def _debug(msg: str, level: int = 0) -> None: + if level > 0: + print(f"[DEBUG]\t{msg}") \ No newline at end of file diff --git a/src/robot/helpers/robot_mqtt.py b/src/robot/helpers/robot_mqtt.py index 0af763c..e01eddd 100644 --- a/src/robot/helpers/robot_mqtt.py +++ b/src/robot/helpers/robot_mqtt.py @@ -1,7 +1,51 @@ -"""MQTT helper functions.""" +from __future__ import annotations + +import json + +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient class RobotMQTT: - """Placeholder MQTT helper.""" + def __init__(self, robot_id: int, mqtt: RobotMqttClient, reality: str): + self.robot_mqtt_client = mqtt + self.robot_id = robot_id + self.reality = reality + + self._topics_sub: dict[str, str] = {} + self._subscribe("ROBOT_MSG", f"robot/msg/{robot_id}") + self._subscribe("ROBOT_MSG_BROADCAST", "robot/msg/broadcast") + + def robot_create(self, x: float, y: float, heading: float) -> None: + msg = { + "id": self.robot_id, + "x": x, + "y": y, + "heading": heading, + "reality": self.reality, + } + self.robot_mqtt_client.publish("robot/create", json.dumps(msg)) + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) - pass + def handle_subscription(self, robot, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic in ( + self._topics_sub.get("ROBOT_MSG"), + self._topics_sub.get("ROBOT_MSG_BROADCAST"), + ): + msg_topic = msg.split(" ")[0] + if msg_topic == "ID?": + obj = {"id": self.robot_id, "reality": "V"} + self.robot_mqtt_client.publish("robot/live", json.dumps(obj)) + print(f"robot/live > {json.dumps(obj)}") + elif msg_topic == "START": + robot.start() + elif msg_topic == "STOP": + robot.stop() + elif msg_topic == "RESET": + robot.reset() + else: + print(f"Received (unknown): {topic}> {msg}") diff --git a/src/robot/indicators/abstract_indicator.py b/src/robot/indicators/abstract_indicator.py new file mode 100644 index 0000000..36e2d3c --- /dev/null +++ b/src/robot/indicators/abstract_indicator.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from robot.mqtt.robot_mqtt_client import RobotMqttClient + + +class AbstractIndicator: + def __init__(self, robot, mqtt_client: RobotMqttClient): + self.robot_mqtt_client = mqtt_client + self.robot_id = robot.get_id() + self.robot = robot + + def handle_subscription(self, r, m): # must be overridden in subclasses + raise NotImplementedError diff --git a/src/robot/indicators/neopixel.py b/src/robot/indicators/neopixel.py index 0cd1144..489404a 100644 --- a/src/robot/indicators/neopixel.py +++ b/src/robot/indicators/neopixel.py @@ -1,7 +1,46 @@ -"""NeoPixel indicator controller.""" +"""NeoPixel indicator controller mirroring Java logic.""" +from __future__ import annotations -class NeoPixel: - """Placeholder NeoPixel controller.""" +import json - pass +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.types import RGBColorType + +from .abstract_indicator import AbstractIndicator + + +class NeoPixel(AbstractIndicator): + def __init__(self, robot, mqtt_client: RobotMqttClient): + super().__init__(robot, mqtt_client) + self._topics_sub: dict[str, str] = {} + self._subscribe("NEOPIXEL_IN", f"output/neopixel/{self.robot_id}") + + # Set the default color at beginning + self.change_color(66, 66, 66) + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def handle_subscription(self, r, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic == self._topics_sub.get("NEOPIXEL_IN"): + colors = msg.split(" ") + R = int(colors[0]) + G = int(colors[1]) + B = int(colors[2]) + self.change_color(R, G, B) + else: + print(f"Received (unknown): {topic}> {msg}") + + def change_color(self, red: int, green: int, blue: int) -> None: + color = RGBColorType(red, green, blue) + obj = { + "id": self.robot_id, + "R": color.get_r(), + "G": color.get_g(), + "B": color.get_b(), + } + self.robot_mqtt_client.publish("output/neopixel", json.dumps(obj)) diff --git a/src/robot/interfaces/__init__.py b/src/robot/interfaces/__init__.py new file mode 100644 index 0000000..8be57b2 --- /dev/null +++ b/src/robot/interfaces/__init__.py @@ -0,0 +1,49 @@ +"""Interfaces mirroring the Java interfaces (snake_case in Python). + +This module defines protocol-like interfaces and enums to preserve the design +from the Java implementation while remaining Pythonic. +""" + +from __future__ import annotations + +from enum import Enum +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +if TYPE_CHECKING: # pragma: no cover - for type hints only + from robot.mqtt.mqtt_msg import MqttMsg + from robot.robot_base import Robot + + +class RobotState(Enum): + WAIT = "WAIT" + RUN = "RUN" + BEGIN = "BEGIN" + + +@runtime_checkable +class IRobotState(Protocol): + state: RobotState + + def loop(self) -> None: ... + + def sensor_interrupt(self, sensor: str, value: str) -> None: ... + + def communication_interrupt(self, msg: str) -> None: ... + + def start(self) -> None: ... + + def stop(self) -> None: ... + + def reset(self) -> None: ... + + +@runtime_checkable +class IMqttHandler(Protocol): + def handle_subscription(self, r: "Robot", m: "MqttMsg") -> None: ... + + +__all__ = [ + "RobotState", + "IRobotState", + "IMqttHandler", +] diff --git a/src/robot/motion.py b/src/robot/motion.py index 3945773..652f872 100644 --- a/src/robot/motion.py +++ b/src/robot/motion.py @@ -1,7 +1,5 @@ -"""Motion controller algorithms.""" +"""Motion controller algorithms (re-export).""" +from .helpers.motion_controller import MotionController -class MotionController: - """Placeholder motion controller.""" - - pass +__all__ = ["MotionController"] diff --git a/src/robot/mqtt/__init__.py b/src/robot/mqtt/__init__.py new file mode 100644 index 0000000..d7f0957 --- /dev/null +++ b/src/robot/mqtt/__init__.py @@ -0,0 +1,7 @@ +from .mqtt_msg import MqttMsg +from .robot_mqtt_client import RobotMqttClient + +__all__ = [ + "MqttMsg", + "RobotMqttClient", +] diff --git a/src/robot/mqtt/mqtt_msg.py b/src/robot/mqtt/mqtt_msg.py new file mode 100644 index 0000000..8f93735 --- /dev/null +++ b/src/robot/mqtt/mqtt_msg.py @@ -0,0 +1,28 @@ +from __future__ import annotations + + +class MqttMsg: + _next_id = 0 + + def __init__(self, topic: str, message: str, qos: int = 0): + self.id = MqttMsg._next_id + MqttMsg._next_id += 1 + + self.topic = topic + self.message = message + self.topic_groups = topic.split("/") + self.channel = self.topic_groups[0] if len(self.topic_groups) > 1 else "" + self.qos = qos + + def __lt__(self, other: "MqttMsg") -> bool: + # Define an arbitrary but stable ordering for potential priority queues + return self.id < other.id + + def __le__(self, other: "MqttMsg") -> bool: + return self.id <= other.id + + def __gt__(self, other: "MqttMsg") -> bool: + return self.id > other.id + + def __ge__(self, other: "MqttMsg") -> bool: + return self.id >= other.id diff --git a/src/robot/mqtt/robot_mqtt_client.py b/src/robot/mqtt/robot_mqtt_client.py new file mode 100644 index 0000000..cf83f61 --- /dev/null +++ b/src/robot/mqtt/robot_mqtt_client.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +from collections import deque +import time +from typing import Deque + +try: + import paho.mqtt.client as mqtt +except Exception: # pragma: no cover - provide lightweight stub for tests/no-deps + class _StubClient: + def __init__(self): + pass + + def username_pw_set(self, user, password): + pass + + def connect(self, server, port, keepalive=60): + pass + + def loop_start(self): + pass + + def loop_stop(self): + pass + + def publish(self, topic, payload, qos=0, retain=False): + pass + + def subscribe(self, topic): + pass + + def disconnect(self): + pass + + def reconnect(self): + pass + + class _MQTTMessage: # minimal placeholder type for annotations + pass + + mqtt = type("mqtt_stub", (), {"Client": _StubClient, "MQTTMessage": _MQTTMessage}) + +from robot.exception import MqttClientException +from robot.mqtt.mqtt_msg import MqttMsg + + +class RobotMqttClient: + """MQTT client wrapper mirroring Java's RobotMqttClient. + + - Maintains `in_queue` and `out_queue` of `MqttMsg`. + - Prefixes publish/subscribe topics with `MQTTSettings.channel` externally. + """ + + def __init__( + self, + server: str, + port: int, + user_name: str | None, + password: str | None, + channel: str, + ): + self.server = server + self.port = int(port) + self.user_name = user_name or "" + self.password = password or "" + self.channel = channel or "" + + self._client = mqtt.Client() + self._connected = False + + # Incoming/outgoing queues + self.in_queue: Deque[MqttMsg] = deque() + self.out_queue: Deque[MqttMsg] = deque() + + # Setup callbacks + self._client.on_connect = self._on_connect + self._client.on_message = self._on_message + self._client.on_disconnect = self._on_disconnect + + self._connect() + + # Connection management ------------------------------------------------- + def _connect(self) -> None: + try: + if self.user_name: + self._client.username_pw_set(self.user_name, self.password) + # Start network loop in a background thread + self._client.connect(self.server, self.port, keepalive=60) + self._client.loop_start() + self._connected = True + print("MQTT: Connected") + except Exception as e: + # broad but acceptable for connection bootstrapping + print(f"MQTT connection error: {e}") + self._connected = False + raise MqttClientException(f"MQTT connection error: {e}") + + def _on_connect(self, client: mqtt.Client, userdata, flags, rc): # noqa: ANN001, ANN201 + if rc == 0: + self._connected = True + else: + print(f"MQTT: Connection failed with code {rc}") + + def _on_disconnect(self, client: mqtt.Client, userdata, rc): # noqa: ANN001, ANN201 + self._connected = False + print("Connection lost!") + # Lightweight auto-reconnect attempt; avoid tight loop on persistent failure. + time.sleep(1) + try: + # loop_start is already running; just reconnect the socket + self._client.reconnect() + except Exception as e: + print(f"MQTT reconnect failed: {e}") + + # MQTT callbacks -------------------------------------------------------- + def _on_message( + self, + client: mqtt.Client, + userdata, + msg: mqtt.MQTTMessage, + ): # noqa: ANN001, ANN201 + topic = getattr(msg, "topic", "") or "" + raw_payload = getattr(msg, "payload", b"") + try: + payload = raw_payload.decode("utf-8") if isinstance(raw_payload, (bytes, bytearray)) else str(raw_payload) + except Exception: + payload = str(raw_payload) + + if not topic or not payload: + return + + # Strip channel prefix before enqueueing, matching Java behavior + if "/" in topic: + t = topic[topic.find("/") + 1 :] + else: + t = topic + + self.in_queue.append(MqttMsg(t, payload)) + + + # API ------------------------------------------------------------------- + def publish( + self, topic: str, body: str, qos: int = 0, retained: bool = False + ) -> None: + if not (self._connected and topic and body): + raise MqttClientException("Not Connected or empty topic/body") + full_topic = f"{self.channel}/{topic}" if self.channel else topic + try: + self._client.publish(full_topic, body, qos=qos, retain=retained) + except Exception as e: + raise MqttClientException(str(e)) + + def subscribe(self, topic: str) -> None: + if not (self._connected and topic): + raise MqttClientException("Not Connected or empty topic") + full_topic = f"{self.channel}/{topic}" if self.channel else topic + try: + self._client.subscribe(full_topic) + print(f"Subscribed to {full_topic}") + except Exception as e: + raise MqttClientException(str(e)) + + def in_queue_pop(self) -> MqttMsg | None: + return self.in_queue.popleft() if self.in_queue else None + + def out_queue_pop(self) -> MqttMsg | None: + return self.out_queue.popleft() if self.out_queue else None + + def close(self) -> None: + """Stop the network loop and disconnect cleanly.""" + try: + self._client.loop_stop() + self._client.disconnect() + finally: + self._connected = False \ No newline at end of file diff --git a/src/robot/mqtt_client.py b/src/robot/mqtt_client.py index 4367bb4..dcdefba 100644 --- a/src/robot/mqtt_client.py +++ b/src/robot/mqtt_client.py @@ -1,7 +1,8 @@ -"""MQTT client wrapper for robot communication.""" +"""Compatibility re-export for RobotMqttClient. +The implementation lives in `robot.mqtt.robot_mqtt_client`. +""" -class RobotMqttClient: - """Placeholder MQTT client wrapper.""" +from .mqtt.robot_mqtt_client import RobotMqttClient - pass +__all__ = ["RobotMqttClient"] diff --git a/src/robot/robot_base.py b/src/robot/robot_base.py index 54d0c4b..3f59c8f 100644 --- a/src/robot/robot_base.py +++ b/src/robot/robot_base.py @@ -1,7 +1,181 @@ -"""Base Robot class.""" +"""Abstract Robot base class mirroring swarm.robot.Robot (Java). +Method names are provided in snake_case. +""" -class Robot: - """Base robot class providing lifecycle management.""" +from __future__ import annotations - pass +import time +from typing import Optional + +from robot.communication.directed_comm import DirectedCommunication +from robot.communication.simple_comm import SimpleCommunication +from robot.configs.mqtt_settings import MQTTSettings +from robot.helpers.coordinate import Coordinate +from robot.helpers.motion_controller import MotionController +from robot.helpers.robot_mqtt import RobotMQTT +from robot.indicators.neopixel import NeoPixel +from robot.interfaces import IRobotState, RobotState +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.sensors.color import ColorSensor +from robot.sensors.distance import DistanceSensor +from robot.sensors.proximity import ProximitySensor + + +class Robot(IRobotState): + # Sensors + dist_sensor: DistanceSensor + proximity_sensor: ProximitySensor + color_sensor: ColorSensor + + # Communication + simple_comm: SimpleCommunication + directed_comm: DirectedCommunication + + # Output + neo_pixel: NeoPixel + + # Helpers + motion: MotionController + robot_mqtt: RobotMQTT + coordinates: Coordinate + robot_mqtt_client: RobotMqttClient + + # Vars + id: int + reality: str + state: RobotState = RobotState.WAIT + + def __init__( + self, + id: int, + x: float, + y: float, + heading: float, + reality: str, + ): + self.id = id + self.reality = reality + + # Create mqtt client + self.robot_mqtt_client = RobotMqttClient( + MQTTSettings.server or "localhost", + int(MQTTSettings.port), + MQTTSettings.user_name or "", + MQTTSettings.password or "", + MQTTSettings.channel or "v1", + ) + + self.coordinates = Coordinate(id, x, y, heading, self.robot_mqtt_client) + self.robot_mqtt = RobotMQTT(id, self.robot_mqtt_client, reality) + + # Request simulator to create robot instance + self.robot_mqtt.robot_create( + self.coordinates.get_x(), + self.coordinates.get_y(), + self.coordinates.get_heading(), + ) + + self.delay(1000) + self.motion = MotionController(self.coordinates) + + # Lifecycle --------------------------------------------------------- + def setup(self) -> None: + # Setup each module + self.dist_sensor = DistanceSensor(self, self.robot_mqtt_client) + self.proximity_sensor = ProximitySensor(self, self.robot_mqtt_client) + self.color_sensor = ColorSensor(self, self.robot_mqtt_client) + + self.neo_pixel = NeoPixel(self, self.robot_mqtt_client) + + self.simple_comm = SimpleCommunication(self.id, self.robot_mqtt_client) + self.directed_comm = DirectedCommunication(self.id, self.robot_mqtt_client) + + self.coordinates.set_coordinate_heading( + self.coordinates.get_x(), + self.coordinates.get_y(), + self.coordinates.get_heading(), + ) + self.coordinates.publish_coordinate() + + def get_id(self) -> int: + return self.id + + def run(self) -> None: + self.setup() + while True: + begin_time = time.time() * 1000 + try: + self.loop() + except Exception as e: # noqa: BLE001 + print(e) + end_time = time.time() * 1000 + + # Maintain ~1Hz loop rate as in Java implementation + self.delay(int(max(0, 1000 - (end_time - begin_time)))) + + try: + self.handle_subscribe_queue() + except Exception as e: # noqa: BLE001 + print(e) + + # Subscription handler ---------------------------------------------- + def handle_subscribe_queue(self) -> None: + while self.robot_mqtt_client.in_queue: + m: Optional[MqttMsg] = self.robot_mqtt_client.in_queue_pop() + if not m: + continue + tg0 = m.topic_groups[0] if m.topic_groups else "" + if tg0 == "robot": + self.robot_mqtt.handle_subscription(self, m) + elif tg0 == "sensor": + if len(m.topic_groups) > 1: + if m.topic_groups[1] == "distance": + self.dist_sensor.handle_subscription(self, m) + elif m.topic_groups[1] == "color": + self.color_sensor.handle_subscription(self, m) + elif m.topic_groups[1] == "proximity": + self.proximity_sensor.handle_subscription(self, m) + elif tg0 == "localization": + if m.topic == "localization/update/?": + self.coordinates.handle_subscription(self, m) + elif tg0 == "comm": + if len(m.topic_groups) > 2 and m.topic_groups[2] == "simple": + self.simple_comm.handle_subscription(self, m) + else: + self.directed_comm.handle_subscription(self, m) + elif tg0 == "output": + if len(m.topic_groups) > 1 and m.topic_groups[1] == "NeoPixel": + self.neo_pixel.handle_subscription(self, m) + + # State handlers ----------------------------------------------------- + def start(self) -> None: + print(f"Robot start on {self.id}") + self.state = RobotState.RUN + + def stop(self) -> None: + print(f"Robot stop on {self.id}") + self.state = RobotState.WAIT + + def reset(self) -> None: + print(f"Robot reset on {self.id}") + self.state = RobotState.BEGIN + + # Utility ------------------------------------------------------------ + def delay(self, milliseconds: int) -> None: + try: + time.sleep(max(0, milliseconds) / 1000.0) + except Exception: + # Ignore sleep interruptions (e.g., during teardown or patched sleeps in tests) + return + + # Abstracts to implement in subclasses ------------------------------ + def loop(self) -> None: + pass + + def sensor_interrupt(self, sensor: str, value: str) -> None: + pass + + def communication_interrupt(self, msg: str) -> None: + pass diff --git a/src/robot/sensors/abstract_sensor.py b/src/robot/sensors/abstract_sensor.py new file mode 100644 index 0000000..08d12b4 --- /dev/null +++ b/src/robot/sensors/abstract_sensor.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from robot.interfaces import IMqttHandler +from robot.mqtt.robot_mqtt_client import RobotMqttClient + + +class AbstractSensor(IMqttHandler): + def __init__(self, robot, mqtt_client: RobotMqttClient): + self.robot_mqtt_client = mqtt_client + self.robot_id = robot.get_id() + self.robot = robot + + def handle_subscription(self, r, m): # default no-op + pass diff --git a/src/robot/sensors/color.py b/src/robot/sensors/color.py index 94fe0be..22f90b0 100644 --- a/src/robot/sensors/color.py +++ b/src/robot/sensors/color.py @@ -1,7 +1,68 @@ -"""Color sensor implementation.""" +"""Color sensor implementation mirroring Java logic.""" +from __future__ import annotations -class ColorSensor: - """Placeholder color sensor.""" +import json +from time import time - pass +from robot.exception import SensorException +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.types import RGBColorType + +from .abstract_sensor import AbstractSensor + + +class ColorSensor(AbstractSensor): + MQTT_TIMEOUT = 1000 # ms + + def __init__(self, robot, mqtt_client: RobotMqttClient): + super().__init__(robot, mqtt_client) + self._topics_sub: dict[str, str] = {} + self.color = RGBColorType(0, 0, 0) + self._subscribe("COLOR_IN", f"sensor/color/{self.robot_id}") + self._subscribe("COLOR_LOOK", f"sensor/color/{self.robot_id}/?") + self._col_lock = False + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def handle_subscription(self, robot, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic == self._topics_sub.get("COLOR_IN"): + self.color.set_color_from_str(msg) + self._col_lock = False + else: + print(f"Received (unknown): {topic}> {m}") + + def get_color(self) -> RGBColorType: + msg = {"id": self.robot_id, "reality": "M"} + self._col_lock = True + self.robot_mqtt_client.publish("sensor/color", json.dumps(msg)) + self.robot.delay(250) + + start_time = time() * 1000 + timeout = False + while self._col_lock and not timeout: + try: + self.robot.handle_subscribe_queue() + except Exception as e: # noqa: BLE001 + print(e) + self.robot.delay(100) + timeout = time() * 1000 - start_time > self.MQTT_TIMEOUT + + if timeout: + raise SensorException("Color sensor timeout") + return self.color + + def send_color(self, red: int, green: int, blue: int, ambient: int) -> None: + obj = { + "id": self.robot_id, + "R": red, + "G": green, + "B": blue, + "ambient": ambient, + } + # Align with request topic used in get_color() + self.robot_mqtt_client.publish("sensor/color", json.dumps(obj)) diff --git a/src/robot/sensors/distance.py b/src/robot/sensors/distance.py index 3c8c035..a616ac9 100644 --- a/src/robot/sensors/distance.py +++ b/src/robot/sensors/distance.py @@ -1,7 +1,64 @@ -"""Distance sensor implementation.""" +"""Distance sensor implementation mirroring Java logic.""" +from __future__ import annotations -class DistanceSensor: - """Placeholder distance sensor.""" +import json +from time import time - pass +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.mqtt.mqtt_msg import MqttMsg +from robot.exception import SensorException +from .abstract_sensor import AbstractSensor + + +class DistanceSensor(AbstractSensor): + MQTT_TIMEOUT = 1000 # ms + + def __init__(self, robot, mqtt_client: RobotMqttClient): + super().__init__(robot, mqtt_client) + self._topics_sub: dict[str, str] = {} + self._subscribe("DISTANCE_IN", f"sensor/distance/{self.robot_id}") + self._subscribe("DISTANCE_LOOK", f"sensor/distance/{self.robot_id}/?") + self._dist_lock = False + self._dist_value = 0 + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def handle_subscription(self, robot, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic == self._topics_sub.get("DISTANCE_IN"): + if msg == "Infinity": + self._dist_value = -1 + else: + self._dist_value = int(msg) + self._dist_lock = False + else: + print(f"Received (unknown): {topic}> {msg}") + + def get_distance(self) -> float: + msg = {"id": self.robot_id, "reality": "M"} + self._dist_lock = True + self.robot_mqtt_client.publish("sensor/distance", json.dumps(msg)) + self.robot.delay(250) + + start_time = time() * 1000 + timeout = False + while self._dist_lock and not timeout: + try: + self.robot.handle_subscribe_queue() + except Exception as e: # noqa: BLE001 + print(e) + self.robot.delay(100) + timeout = (time() * 1000 - start_time > self.MQTT_TIMEOUT) + + if timeout: + raise SensorException("Distance sensor timeout") + + return float(self._dist_value) + + def send_distance(self, dist: float) -> None: + obj = {"id": self.robot_id, "dist": dist} + # Align with request topic used in get_distance() + self.robot_mqtt_client.publish("sensor/distance", json.dumps(obj)) diff --git a/src/robot/sensors/proximity.py b/src/robot/sensors/proximity.py index 683a83d..c6ad209 100644 --- a/src/robot/sensors/proximity.py +++ b/src/robot/sensors/proximity.py @@ -1,7 +1,76 @@ -"""Proximity sensor implementation.""" +"""Proximity sensor implementation mirroring Java logic.""" +from __future__ import annotations -class ProximitySensor: - """Placeholder proximity sensor.""" +import json +from time import time - pass +from robot.exception import SensorException +from robot.mqtt.mqtt_msg import MqttMsg +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.types import ProximityReadingType + +from .abstract_sensor import AbstractSensor + + +class ProximitySensor(AbstractSensor): + MQTT_TIMEOUT = 2000 # ms + + def __init__( + self, + robot, + mqtt_client: RobotMqttClient, + angles: list[int] | None = None, + ): + super().__init__(robot, mqtt_client) + self._topics_sub: dict[str, str] = {} + self._subscribe("PROXIMITY_IN", f"sensor/proximity/{self.robot_id}") + self._proximity_lock = False + self._proximity: ProximityReadingType | None = None + self._angles: list[int] = list(angles) if angles is not None else [0] + + def _subscribe(self, key: str, topic: str) -> None: + self._topics_sub[key] = topic + self.robot_mqtt_client.subscribe(topic) + + def handle_subscription(self, robot, m: MqttMsg) -> None: + topic, msg = m.topic, m.message + if topic == self._topics_sub.get("PROXIMITY_IN"): + try: + self._proximity = ProximityReadingType(self._angles, msg) + except Exception as e: # noqa: BLE001 + print(e) + self._proximity_lock = False + else: + print(f"Received (unknown): {topic}> {msg}") + + def set_angles(self, proximity_angles: list[int]) -> None: + self._angles = list(proximity_angles) + + def get_proximity(self) -> ProximityReadingType: + angle_array = list(self._angles) + msg = {"id": self.robot_id, "angles": angle_array, "reality": "V"} + self._proximity_lock = True + self.robot_mqtt_client.publish("sensor/proximity", json.dumps(msg)) + self.robot.delay(250 * len(self._angles)) + + start_time = time() * 1000 + timeout = False + while self._proximity_lock and not timeout: + try: + self.robot.handle_subscribe_queue() + except Exception as e: # noqa: BLE001 + print(e) + self.robot.delay(100) + timeout = time() * 1000 - start_time > self.MQTT_TIMEOUT + + if timeout: + raise SensorException("Proximity sensor timeout") + assert self._proximity is not None + return self._proximity + + def send_proximity(self) -> None: + assert self._proximity is not None + obj = {"id": self.robot_id, "proximity": str(self._proximity)} + # Align with request topic used in get_proximity() + self.robot_mqtt_client.publish("sensor/proximity", json.dumps(obj)) diff --git a/src/robot/test_all.py b/src/robot/test_all.py new file mode 100644 index 0000000..2860503 --- /dev/null +++ b/src/robot/test_all.py @@ -0,0 +1,145 @@ +"""Lightweight integration smoke tests for the robot package. + +This script is intended to be runnable directly and avoids network +dependencies by injecting a fake MQTT client and a minimal robot stub. + +Run from the repository root with: + + python -m unittest robot.test_all -v + +""" +from __future__ import annotations + +import os +import sys +import unittest + +# Ensure package root (src) is on sys.path when running the file directly +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +from robot.types.rgb_color_type import RGBColorType +from robot.types.proximity_reading_type import ProximityReadingType +from robot.helpers.coordinate import Coordinate +from robot.helpers.motion_controller import MotionController +from robot.indicators.neopixel import NeoPixel +from robot.sensors.distance import DistanceSensor +from robot.sensors.color import ColorSensor +from robot.sensors.proximity import ProximitySensor +from robot.mqtt.mqtt_msg import MqttMsg + + +class FakeMqttClient: + def __init__(self): + self.publishes = [] + self.subscribes = [] + + def publish(self, topic: str, body: str, qos: int = 0, retain: bool = False): + self.publishes.append((topic, body, qos, retain)) + + def subscribe(self, topic: str): + self.subscribes.append(topic) + + +class FakeRobot: + def __init__(self, _id: int = 1): + self._id = _id + + def get_id(self) -> int: + return self._id + + def delay(self, ms: int) -> None: + # no-op for tests + return + + def handle_subscribe_queue(self) -> None: + # no-op for tests + return + + def start(self) -> None: + self._state = "started" + + def stop(self) -> None: + self._state = "stopped" + + def reset(self) -> None: + self._state = "reset" + + +class RobotPackageSmokeTests(unittest.TestCase): + def test_rgb_color_type_basic(self): + c = RGBColorType(10, 20, 30) + self.assertEqual(c.get_color(), [10, 20, 30]) + + c2 = RGBColorType("#0A141E") + self.assertEqual(c2.get_color(), [10, 20, 30]) + + c3 = RGBColorType("10 20 30") + self.assertEqual(str(c3), "R:10, G:20, B:30") + + self.assertTrue(c.compare_to(c3)) + + with self.assertRaises(Exception): + RGBColorType("#BAD") + + def test_proximity_reading_type(self): + angles = [0, 1] + # distances: 5 and 10, colors: blue and red + s = "5 10 #0000FF #FF0000" + pr = ProximityReadingType(angles, s) + self.assertEqual(pr.distances(), [5, 10]) + cols = pr.colors() + self.assertEqual(cols[0].get_color(), [0, 0, 255]) + self.assertEqual(cols[1].get_color(), [255, 0, 0]) + + def test_coordinate_publish_and_setters(self): + fake = FakeMqttClient() + coord = Coordinate(1, 1.5, 2.5, 90.0, fake) + self.assertAlmostEqual(coord.get_x(), 1.5) + coord.set_coordinate_heading(3.3, 4.4, -45) + coord.publish_coordinate() + self.assertTrue(any(p[0] == "localization/update" for p in fake.publishes)) + + def test_motion_controller_stub_coord_and_moves(self): + mc = MotionController(None) + # Should not raise when moving with zeros + mc.move(0, 0, 0) + mc.rotate(0, 0) + + def test_neopixel_and_indicator_publish(self): + fake = FakeMqttClient() + robot = FakeRobot(2) + np = NeoPixel(robot, fake) + # initialization should have called change_color -> publish present + self.assertTrue(any(p[0] == "output/neopixel" for p in fake.publishes)) + # change color explicitly + fake.publishes.clear() + np.change_color(1, 2, 3) + self.assertTrue(any(p[0] == "output/neopixel" for p in fake.publishes)) + + def test_sensors_handle_subscription(self): + fake = FakeMqttClient() + robot = FakeRobot(3) + + d = DistanceSensor(robot, fake) + # Simulate incoming distance message + topic = d._topics_sub.get("DISTANCE_IN") + m = MqttMsg(topic, "42") + d.handle_subscription(robot, m) + self.assertEqual(d._dist_value, 42) + + c = ColorSensor(robot, fake) + topic_c = c._topics_sub.get("COLOR_IN") + mc = MqttMsg(topic_c, "12 34 56") + c.handle_subscription(robot, mc) + self.assertEqual(c.color.get_color(), [12, 34, 56]) + + p = ProximitySensor(robot, fake, angles=[0]) + topic_p = p._topics_sub.get("PROXIMITY_IN") + pm = MqttMsg(topic_p, "5 #00FF00") + p.handle_subscription(robot, pm) + # After handling, internal _proximity should be set (or printed on error) + self.assertIsNotNone(p._proximity) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/robot/test_integration.py b/src/robot/test_integration.py new file mode 100644 index 0000000..a97695c --- /dev/null +++ b/src/robot/test_integration.py @@ -0,0 +1,200 @@ +"""Integration tests using real RobotMqttClient (requires MQTT broker or stub). + +To run with a real MQTT broker: + 1. Start Mosquitto: mosquitto -p 1883 + 2. Run: python -m unittest robot.test_integration -v + +This test uses a lightweight in-process stub if no broker is available. +""" +from __future__ import annotations + +import os +import sys +import threading +import time +import unittest +from collections import deque + +# Ensure package root (src) is on sys.path +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +from robot.mqtt.robot_mqtt_client import RobotMqttClient +from robot.mqtt.mqtt_msg import MqttMsg + + +class InMemoryMQTTBroker: + """Lightweight in-process MQTT broker stub for testing without external dependencies.""" + + def __init__(self): + self.topics = {} # topic -> list of subscribers + self.message_queues = {} # client_id -> deque of messages + self.lock = threading.Lock() + + def subscribe(self, client_id: str, topic: str): + with self.lock: + if topic not in self.topics: + self.topics[topic] = [] + if client_id not in self.topics[topic]: + self.topics[topic].append(client_id) + if client_id not in self.message_queues: + self.message_queues[client_id] = deque() + + def publish(self, topic: str, payload: str): + with self.lock: + if topic in self.topics: + for client_id in self.topics[topic]: + if client_id in self.message_queues: + self.message_queues[client_id].append(MqttMsg(topic, payload)) + + def get_message(self, client_id: str) -> MqttMsg | None: + with self.lock: + if client_id in self.message_queues and self.message_queues[client_id]: + return self.message_queues[client_id].popleft() + return None + + +# Global broker instance +_broker = InMemoryMQTTBroker() + + +class IntegrationTestRobotMqttClient(unittest.TestCase): + """Test RobotMqttClient behavior with in-process broker.""" + + def test_client_initialization(self): + """Test that client can be created.""" + try: + client = RobotMqttClient( + server="localhost", + port=1883, + user_name="", + password="", + channel="test_ch1" + ) + # If it connects successfully, close it + client.close() + except Exception as e: + # If real broker not available, that's okay for this test + print(f"Note: Real MQTT broker not available: {e}") + self.skipTest("Real MQTT broker not available (expected)") + + def test_publish_and_receive(self): + """Test publish/subscribe cycle with in-memory broker.""" + broker = _broker + + # Simulate two clients + client1_id = "test_client_1" + client2_id = "test_client_2" + + # Client 1 subscribes to a topic + broker.subscribe(client1_id, "test/topic") + + # Client 2 publishes to that topic + broker.publish("test/topic", "Hello World") + + # Client 1 should receive the message + msg = broker.get_message(client1_id) + self.assertIsNotNone(msg) + self.assertEqual(msg.topic, "test/topic") + self.assertEqual(msg.message, "Hello World") + + def test_multiple_subscribers(self): + """Test that multiple clients receive same message.""" + broker = _broker + + client1_id = "multi_client_1" + client2_id = "multi_client_2" + topic = "broadcast/topic" + + # Both subscribe + broker.subscribe(client1_id, topic) + broker.subscribe(client2_id, topic) + + # Publish + broker.publish(topic, "Broadcast Message") + + # Both should receive + msg1 = broker.get_message(client1_id) + msg2 = broker.get_message(client2_id) + + self.assertIsNotNone(msg1) + self.assertIsNotNone(msg2) + self.assertEqual(msg1.message, msg2.message) + + def test_mqtt_msg_queueing(self): + """Test that MqttMsg objects are properly queued.""" + broker = _broker + client_id = "queue_test" + + broker.subscribe(client_id, "queue/test") + + # Publish multiple messages + for i in range(5): + broker.publish("queue/test", f"Message {i}") + + # Retrieve all + messages = [] + for _ in range(5): + msg = broker.get_message(client_id) + if msg: + messages.append(msg) + + self.assertEqual(len(messages), 5) + for i, msg in enumerate(messages): + self.assertEqual(msg.message, f"Message {i}") + + +class RobotMqttClientMockTests(unittest.TestCase): + """Test RobotMqttClient with the paho mock stub.""" + + def test_client_queue_operations(self): + """Test in_queue and out_queue operations.""" + try: + client = RobotMqttClient( + server="localhost", + port=1883, + user_name=None, + password=None, + channel="v1" + ) + + # Test queue pop on empty + result = client.in_queue_pop() + self.assertIsNone(result) + + result = client.out_queue_pop() + self.assertIsNone(result) + + client.close() + except Exception as e: + print(f"Note: Real MQTT broker not available: {e}") + self.skipTest("Real MQTT broker not available (expected)") + + def test_in_queue_append(self): + """Test appending to in_queue.""" + try: + client = RobotMqttClient( + server="localhost", + port=1883, + user_name=None, + password=None, + channel="v1" + ) + + # Manually add a message (simulating broker callback) + msg = MqttMsg("test/topic", "test payload") + client.in_queue.append(msg) + + # Pop it back + popped = client.in_queue_pop() + self.assertIsNotNone(popped) + self.assertEqual(popped.topic, "test/topic") + self.assertEqual(popped.message, "test payload") + + client.close() + except Exception as e: + print(f"Note: Real MQTT broker not available: {e}") + self.skipTest("Real MQTT broker not available (expected)") + + +if __name__ == "__main__": + unittest.main() diff --git a/src/robot/types/__init__.py b/src/robot/types/__init__.py new file mode 100644 index 0000000..0111fa1 --- /dev/null +++ b/src/robot/types/__init__.py @@ -0,0 +1,7 @@ +from .proximity_reading_type import ProximityReadingType +from .rgb_color_type import RGBColorType + +__all__ = [ + "RGBColorType", + "ProximityReadingType", +] diff --git a/src/robot/types/proximity_reading_type.py b/src/robot/types/proximity_reading_type.py new file mode 100644 index 0000000..e36345a --- /dev/null +++ b/src/robot/types/proximity_reading_type.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from robot.exception import ProximityException + +from .rgb_color_type import RGBColorType + + +class ProximityReadingType: + def __init__(self, angles: list[int] | tuple[int, ...], s: str): + reading_count = len(angles) + values = s.split() + + self._distances: list[int] = [0] * reading_count + self._colors: list[RGBColorType] = [ + RGBColorType(0, 0, 0) for _ in range(reading_count) + ] + + if len(values) != reading_count * 2: + raise ProximityException( + f"ProximityReadingType: length mismatch {len(values)}" + ) + + for i in range(reading_count): + vi = values[i] + if vi == "Infinity": + print(f"Proximity: Infinity reading received for {i}") + self._distances[i] = -1 + else: + self._distances[i] = int(vi) + + color = RGBColorType(0, 0, 0) + color.set_color_from_hex_code(values[reading_count + i]) + self._colors[i] = color + + def distances(self) -> list[int]: + return self._distances + + def colors(self) -> list[RGBColorType]: + return self._colors + + def __str__(self) -> str: + parts: list[str] = [] + parts.extend(str(d) for d in self._distances) + parts.append(" ") + parts.extend(str(c) for c in self._colors) + return " ".join(parts) diff --git a/src/robot/types/rgb_color_type.py b/src/robot/types/rgb_color_type.py new file mode 100644 index 0000000..89bd8dc --- /dev/null +++ b/src/robot/types/rgb_color_type.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from typing import Iterable + +from robot.exception import RGBColorException + + +class RGBColorType: + def __init__( + self, + R: int | str | Iterable[int], + G: int | None = None, + B: int | None = None, + ): + # Overloads similar to Java constructors + if isinstance(R, str): + # "R G B" format or hex code like "#00AAFF" + s = R.strip() + if s.startswith("#"): + self.set_color_from_hex_code(s) + else: + self.set_color_from_str(s) + elif G is None and B is None and not isinstance(R, int): + vals = list(R) + if not (len(vals) == 3 or len(vals) == 4): + raise ValueError( + "length of the color[] should be equal to 3 (ambient ignored)" + ) + self.set_color(vals[0], vals[1], vals[2]) + else: + assert G is not None and B is not None + self.set_color(R, G, B) + + def set_color_from_str(self, s: str) -> None: + parts = s.split() + if not (len(parts) == 3 or len(parts) == 4): + raise RGBColorException() + self.set_color(int(parts[0]), int(parts[1]), int(parts[2])) + + def set_color_from_hex_code(self, hex_code: str) -> None: + # Expecting format like "#RRGGBB" + try: + r = int(hex_code[1:3], 16) + g = int(hex_code[3:5], 16) + b = int(hex_code[5:7], 16) + except (ValueError, TypeError, IndexError): + # Handle bad hex codes (e.g., "#F00", "ABC", or too short) + raise RGBColorException(f"Invalid hex code format: {hex_code}") + + # Use the main set_color method, which already validates 0-255 + self.set_color(r, g, b) + + def set_color(self, R: int, G: int, B: int) -> None: + if not (0 <= R <= 255): + raise RGBColorException(R, G, B) + if not (0 <= G <= 255): + raise RGBColorException(R, G, B) + if not (0 <= B <= 255): + raise RGBColorException(R, G, B) + + self.R = R + self.G = G + self.B = B + + def get_r(self) -> int: + return self.R + + def get_g(self) -> int: + return self.G + + def get_b(self) -> int: + return self.B + + def get_color(self) -> list[int]: + return [self.R, self.G, self.B] + + def __str__(self) -> str: + return f"R:{self.R}, G:{self.G}, B:{self.B}" + + def to_string_color(self) -> str: + return f"{self.R} {self.G} {self.B}" + + def compare_to(self, color: "RGBColorType") -> bool: + return ( + (color.get_r() == self.R) + and (color.get_g() == self.G) + and (color.get_b() == self.B) + ) diff --git a/src/robot/virtual_robot.py b/src/robot/virtual_robot.py new file mode 100644 index 0000000..22aa407 --- /dev/null +++ b/src/robot/virtual_robot.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from .robot_base import Robot + + +class VirtualRobot(Robot): + def __init__(self, id: int, x: float, y: float, heading: float): + super().__init__(id, x, y, heading, "V") + + def loop(self) -> None: + # To be implemented by user subclasses + pass + + def sensor_interrupt(self, sensor: str, value: str) -> None: + if sensor == "distance": + print(f"Distance sensor interrupt on {self.id} with value {value}") + elif sensor == "color": + print(f"Color sensor interrupt on {self.id} with value {value}") + elif sensor == "proximity": + print(f"Proximity sensor interrupt on {self.id} with value {value}") + else: + print("Unknown sensor type") + + def communication_interrupt(self, msg: str) -> None: + # To be implemented in subclass if desired + pass diff --git a/tests/test_mqtt_client.py b/tests/test_mqtt_client.py new file mode 100644 index 0000000..9b3f15b --- /dev/null +++ b/tests/test_mqtt_client.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +import types + +import pytest + +import robot.mqtt.robot_mqtt_client as mqtt_client_mod + + +class FakeClient: + """Minimal stand-in for paho.mqtt.client.Client.""" + + def __init__(self): + self.published = [] + self.subscribed = [] + self.loop_started = False + self.loop_stopped = False + self.disconnected = False + self.reconnected = False + self.creds = None + # Callbacks are set by RobotMqttClient after construction. + self.on_connect = None + self.on_message = None + self.on_disconnect = None + + def username_pw_set(self, username: str, password: str) -> None: # pragma: no cover - simple setter + self.creds = (username, password) + + def connect(self, server: str, port: int, keepalive: int) -> None: # pragma: no cover - simple setter + self.connected_args = (server, port, keepalive) + + def reconnect(self) -> None: + self.reconnected = True + + def loop_start(self) -> None: + self.loop_started = True + + def loop_stop(self) -> None: + self.loop_stopped = True + + def disconnect(self) -> None: + self.disconnected = True + + def publish(self, topic: str, body: str, qos: int = 0, retain: bool = False) -> None: + self.published.append((topic, body, qos, retain)) + + def subscribe(self, topic: str) -> None: + self.subscribed.append(topic) + + +@pytest.fixture(autouse=True) +def patch_mqtt_client(monkeypatch): + """Replace paho.mqtt with a lightweight fake and skip real sleeps.""" + import robot.mqtt.robot_mqtt_client as mqtt_client_mod + + monkeypatch.setattr( + mqtt_client_mod, + "mqtt", + types.SimpleNamespace(Client=FakeClient), + ) + monkeypatch.setattr(mqtt_client_mod.time, "sleep", lambda *_args, **_kwargs: None) + yield + + +def test_publish_prefix(): + client = mqtt_client_mod.RobotMqttClient("srv", 1883, None, None, "chan") + fake = client._client # type: ignore[attr-defined] + + client.publish("foo", "bar") + + assert fake.published[0][0] == "chan/foo" + assert fake.published[0][1] == "bar" + + +def test_subscribe_prefix(): + client = mqtt_client_mod.RobotMqttClient("srv", 1883, None, None, "chan") + fake = client._client # type: ignore[attr-defined] + + client.subscribe("abc") + + assert fake.subscribed[0] == "chan/abc" + + +def test_on_message_strips_channel_and_enqueues(): + client = mqtt_client_mod.RobotMqttClient("srv", 1883, None, None, "chan") + + msg = types.SimpleNamespace(topic="chan/robot/msg/10", payload=b"START") + client._on_message(None, None, msg) # noqa: SLF001 + + m = client.in_queue_pop() + assert m is not None + assert m.topic == "robot/msg/10" + assert m.message == "START" + + +def test_disconnect_attempts_reconnect(): + client = mqtt_client_mod.RobotMqttClient("srv", 1883, None, None, "chan") + fake = client._client # type: ignore[attr-defined] + + client._on_disconnect(None, None, 0) # noqa: SLF001 + + assert fake.reconnected is True + + +def test_close_stops_loop_and_disconnects(): + client = mqtt_client_mod.RobotMqttClient("srv", 1883, None, None, "chan") + fake = client._client # type: ignore[attr-defined] + + client.close() + + assert fake.loop_stopped is True + assert fake.disconnected is True diff --git a/tests/test_proximity_reading_type.py b/tests/test_proximity_reading_type.py new file mode 100644 index 0000000..e91a0a0 --- /dev/null +++ b/tests/test_proximity_reading_type.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import pytest + +from robot.exception import ProximityException +from robot.types.proximity_reading_type import ProximityReadingType + + +def test_parses_distances_and_colors(): + reading = ProximityReadingType([0, 30], "10 20 #FF0000 #00FF00") + + assert reading.distances() == [10, 20] + colors = reading.colors() + assert colors[0].get_color() == [255, 0, 0] + assert colors[1].get_color() == [0, 255, 0] + + +def test_infinity_maps_to_negative_one(): + reading = ProximityReadingType([0], "Infinity #010203") + + assert reading.distances() == [-1] + assert reading.colors()[0].get_color() == [1, 2, 3] + + +def test_length_mismatch_raises(): + with pytest.raises(ProximityException): + ProximityReadingType([0, 30], "10 #FF0000")