diff --git a/README.md b/README.md index b30e566ed..a5833339a 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ VirtualShip is a command line simulator allowing students to plan and conduct a - Surface drifters - Argo float deployments - +Along the way, students will encounter realistic problems that may occur during an oceanographic expedition, requiring them to make decisions to adapt their plans accordingly. For example, delays due to equipment failures, pre-depature logistical issues or safety drills. ## Installation diff --git a/src/virtualship/cli/_run.py b/src/virtualship/cli/_run.py index f07fbab27..20de173fb 100644 --- a/src/virtualship/cli/_run.py +++ b/src/virtualship/cli/_run.py @@ -7,26 +7,26 @@ from pathlib import Path import copernicusmarine -import pyproj from virtualship.expedition.simulate_schedule import ( MeasurementsToSimulate, ScheduleProblem, simulate_schedule, ) -from virtualship.models import Schedule -from virtualship.models.checkpoint import Checkpoint +from virtualship.make_realistic.problems.simulator import ProblemSimulator +from virtualship.models import Checkpoint, Schedule from virtualship.utils import ( CHECKPOINT, + EXPEDITION, + PROBLEMS_ENCOUNTERED_DIR, + PROJECTION, + SELECTED_PROBLEMS, _get_expedition, + _save_checkpoint, expedition_cost, get_instrument_class, ) -# projection used to sail between waypoints -projection = pyproj.Geod(ellps="WGS84") - - # parcels logger (suppress INFO messages to prevent log being flooded) external_logger = logging.getLogger("parcels.tools.loggers") external_logger.setLevel(logging.WARNING) @@ -35,7 +35,9 @@ logging.getLogger("copernicusmarine").setLevel("ERROR") -def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: +def _run( + expedition_dir: str | Path, prob_level: int, from_data: Path | None = None +) -> None: """ Perform an expedition, providing terminal feedback and file output. @@ -50,8 +52,8 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: print("╚═════════════════════════════════════════════════╝") if from_data is None: - # TODO: caution, if collaborative environments, will this mean everyone uses the same credentials file? - # TODO: need to think about how to deal with this for when using collaborative environments AND streaming data via copernicusmarine + # TODO: caution, if collaborative environments (or the same machine), this will mean that multiple users share the same copernicusmarine credentials file + # TODO: deal with this for if/when using collaborative environments (same machine) and streaming data from Copernicus Marine Service? COPERNICUS_CREDS_FILE = os.path.expandvars( "$HOME/.copernicusmarine/.copernicusmarine-credentials" ) @@ -72,8 +74,14 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: expedition_dir = Path(expedition_dir) expedition = _get_expedition(expedition_dir) + expedition_identifier = expedition.get_unique_identifier() + + # dedicated problems directory for this expedition + problems_dir = expedition_dir / PROBLEMS_ENCOUNTERED_DIR.format( + expedition_identifier=expedition_identifier + ) - # Verify instruments_config file is consistent with schedule + # verify instruments_config file is consistent with schedule expedition.instruments_config.verify(expedition) # load last checkpoint @@ -81,8 +89,8 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: if checkpoint is None: checkpoint = Checkpoint(past_schedule=Schedule(waypoints=[])) - # verify that schedule and checkpoint match - checkpoint.verify(expedition.schedule) + # verify that schedule and checkpoint match, and that problems have been resolved + checkpoint.verify(expedition, problems_dir) print("\n---- WAYPOINT VERIFICATION ----") @@ -93,20 +101,19 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: # simulate the schedule schedule_results = simulate_schedule( - projection=projection, + projection=PROJECTION, expedition=expedition, ) + + # handle cases where user defined schedule is incompatible (i.e. not enough time between waypoints, not problems) if isinstance(schedule_results, ScheduleProblem): print( - f"SIMULATION PAUSED: update your schedule (`virtualship plan`) and continue the expedition by executing the `virtualship run` command again.\nCheckpoint has been saved to {expedition_dir.joinpath(CHECKPOINT)}." + f"Please update your schedule (`virtualship plan` or directly in {EXPEDITION}) and continue the expedition by executing the `virtualship run` command again.\nCheckpoint has been saved to {expedition_dir.joinpath(CHECKPOINT)}." ) _save_checkpoint( Checkpoint( - past_schedule=Schedule( - waypoints=expedition.schedule.waypoints[ - : schedule_results.failed_waypoint_i - ] - ) + past_schedule=expedition.schedule, + failed_waypoint_i=schedule_results.failed_waypoint_i, ), expedition_dir, ) @@ -124,32 +131,78 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: print("\n--- MEASUREMENT SIMULATIONS ---") - # simulate measurements - print("\nSimulating measurements. This may take a while...\n") - + # identify instruments in expedition instruments_in_expedition = expedition.get_instruments() - for itype in instruments_in_expedition: - # get instrument class - instrument_class = get_instrument_class(itype) - if instrument_class is None: - raise RuntimeError(f"No instrument class found for type {itype}.") - - # get measurements to simulate - attr = MeasurementsToSimulate.get_attr_for_instrumenttype(itype) - measurements = getattr(schedule_results.measurements_to_simulate, attr) - - # initialise instrument - instrument = instrument_class( - expedition=expedition, - from_data=Path(from_data) if from_data is not None else None, - ) + # initialise problem simulator + problem_simulator = ProblemSimulator(expedition, expedition_dir) - # execute simulation - instrument.execute( - measurements=measurements, - out_path=expedition_dir.joinpath("results", f"{itype.name.lower()}.zarr"), + # re-load previously encountered (same expedition as previously) problems if they exist, else select new problems and cache them + if os.path.exists(problems_dir / SELECTED_PROBLEMS): + problems = problem_simulator.load_selected_problems( + problems_dir / SELECTED_PROBLEMS ) + else: + problems = problem_simulator.select_problems( + instruments_in_expedition, prob_level + ) + problem_simulator.cache_selected_problems( + problems, problems_dir / SELECTED_PROBLEMS + ) if problems else None + + # simulate instrument measurements + print("\nSimulating measurements. This may take a while...\n") + + for itype in instruments_in_expedition: + try: + # get instrument class + instrument_class = get_instrument_class(itype) + if instrument_class is None: + raise RuntimeError(f"No instrument class found for type {itype}.") + + # execute problem simulations for this instrument type + if problems: + # TODO: this print statement is helpful for user to see so it makes sense when a relevant instrument-related problem occurs; but ideally would be overwritten when the actual measurement simulation spinner starts (try and address this in future PR which improves log output) + print( + "" + if hasattr(problems["problem_class"][0], "pre_departure") + and problems["problem_class"][0].pre_departure + else f"\033[4mUp next\033[0m: {itype.name} measurements...\n" + ) + problem_simulator.execute( + problems, + instrument_type_validation=itype, + problems_dir=problems_dir, + ) + + # get measurements to simulate + attr = MeasurementsToSimulate.get_attr_for_instrumenttype(itype) + measurements = getattr(schedule_results.measurements_to_simulate, attr) + + # initialise instrument + instrument = instrument_class( + expedition=expedition, + from_data=Path(from_data) if from_data is not None else None, + ) + + # execute simulation + instrument.execute( + measurements=measurements, + out_path=expedition_dir.joinpath( + "results", f"{itype.name.lower()}.zarr" + ), + ) + except Exception as e: + # clean up if unexpected error occurs + if os.path.exists(problems_dir): + shutil.rmtree(problems_dir) + if expedition_dir.joinpath(CHECKPOINT).exists(): + os.remove(expedition_dir.joinpath(CHECKPOINT)) + + raise RuntimeError( + f"An unexpected error occurred while simulating measurements: {e}. Please report this issue, with a description and the traceback, " + "to the VirtualShip issue tracker at: https://github.com/OceanParcels/virtualship/issues" + ) from e print("\nAll measurement simulations are complete.") @@ -158,6 +211,17 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: print( f"Your measurements can be found in the '{expedition_dir}/results' directory." ) + + if problems: + print("\n----- RECORD OF PROBLEMS ENCOUNTERED ------") + print( + f"\nA record of problems encountered during the expedition is saved in: {problems_dir}" + ) + + # delete checkpoint file (inteferes with ability to re-run expedition) + if os.path.exists(expedition_dir.joinpath(CHECKPOINT)): + os.remove(expedition_dir.joinpath(CHECKPOINT)) + print("\n------------- END -------------\n") # end timing @@ -174,11 +238,6 @@ def _load_checkpoint(expedition_dir: Path) -> Checkpoint | None: return None -def _save_checkpoint(checkpoint: Checkpoint, expedition_dir: Path) -> None: - file_path = expedition_dir.joinpath(CHECKPOINT) - checkpoint.to_yaml(file_path) - - def _write_expedition_cost(expedition, schedule_results, expedition_dir): """Calculate the expedition cost, write it to a file, and print summary.""" assert expedition.schedule.waypoints[0].time is not None, ( diff --git a/src/virtualship/cli/commands.py b/src/virtualship/cli/commands.py index f349dc6cf..be088ed5c 100644 --- a/src/virtualship/cli/commands.py +++ b/src/virtualship/cli/commands.py @@ -82,6 +82,17 @@ def plan(path): "path", type=click.Path(exists=True, file_okay=False, dir_okay=True, readable=True), ) +@click.option( + "--prob-level", + type=click.IntRange(0, 2), + default=1, + help="Set the problem level for the expedition simulation [default = 1].\n\n" + "Level 0 = No problems encountered during the expedition.\n\n" + "Level 1 = 1-2 problems encountered.\n\n" + "Level 2 = 1 or more problems encountered, depending on expedition length and complexity, where longer and more complex expeditions will encounter more problems.\n\n" + "N.B.: If an expedition has already been run with problems encountered, changing the prob_level on a subsequent re-run will have no effect (previously encountered problems will be re-used). To select new problems (or to skip problems altogether), delete the 'problems_encountered' directory in the expedition directory before re-running with a new prob_level.\n\n" + "Changing waypoint locations and/or instrument types will also result in new problems being selected on the next run.", +) @click.option( "--from-data", type=str, @@ -92,6 +103,6 @@ def plan(path): "Assumes that variable names at least contain the standard Copernicus Marine variable name as a substring. " "Will also take the first file found containing the variable name substring. CAUTION if multiple files contain the same variable name substring.", ) -def run(path, from_data): +def run(path, prob_level, from_data): """Execute the expedition simulations.""" - _run(Path(path), from_data) + _run(Path(path), prob_level, from_data) diff --git a/src/virtualship/expedition/simulate_schedule.py b/src/virtualship/expedition/simulate_schedule.py index e450fcc7c..94fccbc2a 100644 --- a/src/virtualship/expedition/simulate_schedule.py +++ b/src/virtualship/expedition/simulate_schedule.py @@ -20,6 +20,7 @@ Spacetime, Waypoint, ) +from virtualship.utils import _calc_sail_time @dataclass @@ -115,6 +116,8 @@ def __init__(self, projection: pyproj.Geod, expedition: Expedition) -> None: self._next_ship_underwater_st_time = self._time def simulate(self) -> ScheduleOk | ScheduleProblem: + # TODO: instrument config mapping (as introduced in #269) should be helpful for refactoring here... + for wp_i, waypoint in enumerate(self._expedition.schedule.waypoints): # sail towards waypoint self._progress_time_traveling_towards(waypoint.location) @@ -122,9 +125,9 @@ def simulate(self) -> ScheduleOk | ScheduleProblem: # check if waypoint was reached in time if waypoint.time is not None and self._time > waypoint.time: print( - f"Waypoint {wp_i + 1} could not be reached in time. Current time: {self._time}. Waypoint time: {waypoint.time}." + f"\nWaypoint {wp_i + 1} could not be reached in time. Current time: {self._time}. Waypoint time: {waypoint.time}." "\n\nHave you ensured that your schedule includes sufficient time for taking measurements, e.g. CTD casts (in addition to the time it takes to sail between waypoints)?\n" - "**Note**, the `virtualship plan` tool will not account for measurement times when verifying the schedule, only the time it takes to sail between waypoints.\n" + "\nHint: previous schedule verification checks (e.g. in the `virtualship plan` tool or after dealing with unexpected problems during the expedition) will not account for measurement times, only the time it takes to sail between waypoints.\n" ) return ScheduleProblem(self._time, wp_i) else: @@ -140,22 +143,13 @@ def simulate(self) -> ScheduleOk | ScheduleProblem: return ScheduleOk(self._time, self._measurements_to_simulate) def _progress_time_traveling_towards(self, location: Location) -> None: - geodinv: tuple[float, float, float] = self._projection.inv( - lons1=self._location.lon, - lats1=self._location.lat, - lons2=location.lon, - lats2=location.lat, - ) - ship_speed_meter_per_second = ( - self._expedition.ship_config.ship_speed_knots * 1852 / 3600 - ) - azimuth1 = geodinv[0] - distance_to_next_waypoint = geodinv[2] - time_to_reach = timedelta( - seconds=distance_to_next_waypoint / ship_speed_meter_per_second + time_to_reach, azimuth1, ship_speed_meter_per_second = _calc_sail_time( + self._location, + location, + self._expedition.ship_config.ship_speed_knots, + self._projection, ) end_time = self._time + time_to_reach - # note all ADCP measurements if self._expedition.instruments_config.adcp_config is not None: location = self._location diff --git a/src/virtualship/instruments/base.py b/src/virtualship/instruments/base.py index 984e4abf5..2ca1b7836 100644 --- a/src/virtualship/instruments/base.py +++ b/src/virtualship/instruments/base.py @@ -67,7 +67,10 @@ def __init__( ) self.wp_times = wp_times - self.min_time, self.max_time = wp_times[0], wp_times[-1] + self.min_time, self.max_time = ( + wp_times[0], + wp_times[-1] + timedelta(days=1), + ) # avoid edge issues self.min_lat, self.max_lat = min(wp_lats), max(wp_lats) self.min_lon, self.max_lon = min(wp_lons), max(wp_lons) diff --git a/src/virtualship/make_realistic/problems/scenarios.py b/src/virtualship/make_realistic/problems/scenarios.py new file mode 100644 index 000000000..f528238f8 --- /dev/null +++ b/src/virtualship/make_realistic/problems/scenarios.py @@ -0,0 +1,314 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import timedelta + +from virtualship.instruments.types import InstrumentType +from virtualship.utils import register_general_problem, register_instrument_problem + +# ===================================================== +# SECTION: Base Classes +# ===================================================== + + +@dataclass +class GeneralProblem: + """Base class for general problems. Can occur pre-depature or during expedition.""" + + message: str + delay_duration: timedelta + pre_departure: bool # True if problem occurs before expedition departure, False if during expedition + + # TODO: could add a (abstract) method to check if problem is valid for given waypoint, e.g. location (tropical waters etc.) + + +@dataclass +class InstrumentProblem: + """Base class for instrument-specific problems. Cannot occur before expedition departure.""" + + message: str + delay_duration: timedelta + instrument_type: InstrumentType + + # TODO: could add a (abstract) method to check if problem is valid for given waypoint, e.g. location () + + +# ===================================================== +# SECTION: General Problems +# ===================================================== + + +@dataclass +@register_general_problem +class FoodDeliveryDelayed(GeneralProblem): + """Problem: Scheduled food delivery is delayed, causing a postponement of departure.""" + + message: str = ( + "The scheduled food delivery prior to departure has not arrived. Until the supply truck reaches the pier, " + "we cannot leave. Once it arrives, unloading and stowing the provisions in the ship's cold storage " + "will also take additional time. These combined delays postpone departure by approximately 5 hours." + ) + + delay_duration: timedelta = timedelta(hours=5.0) + pre_departure: bool = True + + +@dataclass +@register_general_problem +class CaptainSafetyDrill(GeneralProblem): + """Problem: Sudden initiation of a mandatory safety drill.""" + + message: str = ( + "A miscommunication with the ship's captain results in the sudden initiation of a mandatory safety drill. " + "The emergency vessel must be lowered and tested while the ship remains stationary, pausing all scientific " + "operations for the duration of the exercise. The drill introduces a delay of approximately 2 hours." + ) + delay_duration: timedelta = timedelta(hours=2.0) + pre_departure: bool = False + + +@dataclass +@register_general_problem +class FuelDeliveryIssue(GeneralProblem): + """Problem: Fuel delivery tanker delayed, causing a postponement of departure.""" + + message: str = ( + "The fuel tanker expected to deliver fuel has not arrived. Port authorities are unable to provide " + "a clear estimate for when the delivery might occur. You may choose to [w]ait for the tanker or [g]et a " + "harbor pilot to guide the vessel to an available bunker dock instead. This decision may need to be " + "revisited periodically depending on circumstances." + ) + delay_duration: timedelta = timedelta( + hours=5.0 + ) # dynamic delays based on repeated choices + pre_departure: bool = True + + +@dataclass +@register_general_problem +class MarineMammalInDeploymentArea(GeneralProblem): + """Problem: Marine mammals observed in deployment area, causing delay.""" + + message: str = ( + "A pod of dolphins is observed swimming directly beneath the planned deployment area. " + "To avoid risk to wildlife and comply with environmental protocols, all in-water operations " + "must pause until the animals move away from the vicinity. This results in a delay of about 2 hours." + ) + delay_duration: timedelta = timedelta(hours=2) + pre_departure: bool = False + + +@dataclass +@register_general_problem +class BallastPumpFailure(GeneralProblem): + """Problem: Ballast pump failure during ballasting operations.""" + + message: str = ( + "One of the ship's ballast pumps suddenly stops responding during routine ballasting operations. " + "Without the pump, the vessel cannot safely adjust trim or compensate for equipment movements on deck. " + "Engineering isolates the faulty pump and performs a rapid inspection. Temporary repairs allow limited " + "functionality, but the interruption causes a delay of approximately 4 hours." + ) + delay_duration: timedelta = timedelta(hours=4.0) + pre_departure: bool = False + + +@dataclass +@register_general_problem +class ThrusterConverterFault(GeneralProblem): + """Problem: Bow thruster's power converter fault during station-keeping.""" + + message: str = ( + "The bow thruster's power converter reports a fault during station-keeping operations. " + "Dynamic positioning becomes less stable, forcing a temporary suspension of high-precision sampling. " + "Engineers troubleshoot the converter and perform a reset, resulting in a delay of around 4 hours." + ) + delay_duration: timedelta = timedelta(hours=4.0) + pre_departure: bool = False + + +@dataclass +@register_general_problem +class AFrameHydraulicLeak(GeneralProblem): + """Problem: Hydraulic fluid leak from A-frame actuator.""" + + message: str = ( + "A crew member notices hydraulic fluid leaking from the A-frame actuator during equipment checks. " + "The leak must be isolated immediately to prevent environmental contamination or mechanical failure. " + "Engineering replaces a faulty hose and repressurizes the system. This repair causes a delay of about 6 hours." + ) + delay_duration: timedelta = timedelta(hours=6.0) + pre_departure: bool = False + + +@dataclass +@register_general_problem +class CoolingWaterIntakeBlocked(GeneralProblem): + """Problem: Main engine's cooling water intake blocked.""" + + message: str = ( + "The main engine's cooling water intake alarms indicate reduced flow, likely caused by marine debris " + "or biological fouling. The vessel must temporarily slow down while engineering clears the obstruction " + "and flushes the intake. This results in a delay of approximately 4 hours." + ) + delay_duration: timedelta = timedelta(hours=4.0) + pre_departure: bool = False + + +# TODO: draft problem below, but needs a method to adjust ETA based on reduced speed (future PR) +# @dataclass +# @register_general_problem +# class EngineOverheating: +# message: str = ( +# "One of the main engines has overheated. To prevent further damage, the engineering team orders a reduction " +# "in vessel speed until the engine can be inspected and repaired in port. The ship will now operate at a " +# "reduced cruising speed of 8.5 knots for the remainder of the transit." +# ) +# delay_duration: None = None # speed reduction affects ETA instead of fixed delay +# ship_speed_knots: float = 8.5 + + +# TODO: draft problem below, but needs a method to check if waypoint is in tropical waters (future PR) +# @dataclass +# @register_general_problem +# class VenomousCentipedeOnboard(GeneralProblem): +# """Problem: Venomous centipede discovered onboard in tropical waters.""" + +# message: str = ( +# "A venomous centipede is discovered onboard while operating in tropical waters. " +# "One crew member becomes ill after contact with the creature and receives medical attention, " +# "prompting a full search of the vessel to ensure no further danger. " +# "The medical response and search efforts cause an operational delay of about 2 hours." +# ) +# +# delay_duration: timedelta = timedelta(hours=2.0) +# pre_departure: bool = False + +# def is_valid(self, waypoint: Waypoint) -> bool: +# """Check if the waypoint is in tropical waters.""" +# lat_limit = 23.5 # [degrees] +# return abs(waypoint.latitude) <= lat_limit + + +# ===================================================== +# SECTION: Instrument-specific Problems +# ===================================================== + + +@dataclass +@register_instrument_problem +class CTDCableJammed(InstrumentProblem): + """Problem: CTD cable jammed in winch drum, requiring replacement.""" + + message: str = ( + "During preparation for the next CTD cast, the CTD cable becomes jammed in the winch drum. " + "Attempts to free it are unsuccessful, and the crew determines that the entire cable must be " + "replaced before deployment can continue. This repair is time-consuming and results in a delay " + "of approximately 5 hours." + ) + delay_duration: timedelta = timedelta(hours=5.0) + instrument_type: InstrumentType = InstrumentType.CTD + + +@dataclass +@register_instrument_problem +class ADCPMalfunction(InstrumentProblem): + """Problem: ADCP returns invalid data, requiring inspection.""" + + message: str = ( + "The hull-mounted ADCP begins returning invalid velocity data. Engineering suspects damage to the cable " + "from recent maintenance activities. The ship must hold position while a technician enters the cable " + "compartment to perform an inspection and continuity test. This diagnostic procedure results in a delay " + "of around 2 hours." + ) + delay_duration: timedelta = timedelta(hours=2.0) + instrument_type: InstrumentType = InstrumentType.ADCP + + +@dataclass +@register_instrument_problem +class CTDTemperatureSensorFailure(InstrumentProblem): + """Problem: CTD temperature sensor failure, requiring replacement.""" + + message: str = ( + "The primary temperature sensor on the CTD begins returning inconsistent readings. " + "Troubleshooting confirms that the sensor has malfunctioned. A spare unit can be installed, " + "but integrating and verifying the replacement will pause operations. " + "This procedure leads to an estimated delay of around 3 hours." + ) + delay_duration: timedelta = timedelta(hours=3.0) + instrument_type: InstrumentType = InstrumentType.CTD + + +@dataclass +@register_instrument_problem +class CTDSalinitySensorFailureWithCalibration(InstrumentProblem): + """Problem: CTD salinity sensor failure, requiring replacement and calibration.""" + + message: str = ( + "The CTD's primary salinity sensor fails and must be replaced with a backup. After installation, " + "a mandatory calibration cast to a minimum depth of 1000 meters is required to verify sensor accuracy. " + "Both the replacement and calibration activities result in a total delay of roughly 4 hours." + ) + delay_duration: timedelta = timedelta(hours=4.0) + instrument_type: InstrumentType = InstrumentType.CTD + + +@dataclass +@register_instrument_problem +class WinchHydraulicPressureDrop(InstrumentProblem): + """Problem: CTD winch hydraulic pressure drop, requiring repair.""" + + message: str = ( + "The CTD winch begins to lose hydraulic pressure during routine checks prior to deployment. " + "The engineering crew must stop operations to diagnose the hydraulic pump and replenish or repair " + "the system. Until pressure is restored to operational levels, the winch cannot safely be used. " + "This results in an estimated delay of 2.5 hours." + ) + delay_duration: timedelta = timedelta(hours=2.5) + instrument_type: InstrumentType = InstrumentType.CTD + + +@dataclass +@register_instrument_problem +class RosetteTriggerFailure(InstrumentProblem): + """Problem: CTD rosette trigger failure, requiring inspection.""" + + message: str = ( + "During a CTD cast, the rosette's bottle-triggering mechanism fails to actuate. " + "No discrete water samples can be collected during this cast. The rosette must be brought back " + "on deck for inspection and manual testing of the trigger system. This results in an operational " + "delay of approximately 3.5 hours." + ) + delay_duration: timedelta = timedelta(hours=3.5) + instrument_type: InstrumentType = InstrumentType.CTD + + +@dataclass +@register_instrument_problem +class DrifterSatelliteConnectionDelay(InstrumentProblem): + """Problem: Drifter fails to establish satellite connection before deployment.""" + + message: str = ( + "The drifter scheduled for deployment fails to establish a satellite connection during " + "pre-launch checks. To improve signal acquisition, the float must be moved to a higher location on deck " + "with fewer obstructions. The team waits for the satellite fix to come through, resulting in a delay " + "of approximately 2 hours." + ) + delay_duration: timedelta = timedelta(hours=2.0) + instrument_type: InstrumentType = InstrumentType.DRIFTER + + +@dataclass +@register_instrument_problem +class ArgoSatelliteConnectionDelay(InstrumentProblem): + """Problem: Argo float fails to establish satellite connection before deployment.""" + + message: str = ( + "The Argo float scheduled for deployment fails to establish a satellite connection during " + "pre-launch checks. To improve signal acquisition, the float must be moved to a higher location on deck " + "with fewer obstructions. The team waits for the satellite fix to come through, resulting in a delay " + "of approximately 2 hours." + ) + delay_duration: timedelta = timedelta(hours=2.0) + instrument_type: InstrumentType = InstrumentType.ARGO_FLOAT diff --git a/src/virtualship/make_realistic/problems/simulator.py b/src/virtualship/make_realistic/problems/simulator.py new file mode 100644 index 000000000..f19758383 --- /dev/null +++ b/src/virtualship/make_realistic/problems/simulator.py @@ -0,0 +1,418 @@ +from __future__ import annotations + +import json +import os +import random +import sys +import time +from pathlib import Path +from typing import TYPE_CHECKING + +from yaspin import yaspin + +from virtualship.instruments.types import InstrumentType +from virtualship.make_realistic.problems.scenarios import ( + GeneralProblem, + InstrumentProblem, +) +from virtualship.models.checkpoint import Checkpoint +from virtualship.utils import ( + EXPEDITION, + GENERAL_PROBLEM_REG, + INSTRUMENT_PROBLEM_REG, + PROJECTION, + SCHEDULE_ORIGINAL, + _calc_sail_time, + _calc_wp_stationkeeping_time, + _make_hash, + _save_checkpoint, +) + +if TYPE_CHECKING: + from virtualship.models.expedition import Expedition, Schedule + +LOG_MESSAGING = { + "pre_departure": "Hang on! There could be a pre-departure problem in-port...", + "during_expedition": "Oh no, a problem has occurred during the expedition, at waypoint {waypoint}...!", + "schedule_problems": "This problem will cause a delay of {delay_duration} hours {problem_wp}. The next waypoint therefore cannot be reached in time. Please account for this in your schedule (`virtualship plan` or directly in {expedition_yaml}), then continue the expedition by executing the `virtualship run` command again.\n", + "problem_avoided": "Phew! You had enough contingency time scheduled to avoid delays from this problem. The expedition can carry on shortly...\n", +} + + +# default problem weights for problems simulator (i.e. add +1 problem for every n days/waypoints/instruments in expedition) +PROBLEM_WEIGHTS = { + "every_ndays": 7, + "every_nwaypoints": 6, + "every_ninstruments": 3, +} + + +class ProblemSimulator: + """Handle problem simulation during expedition.""" + + def __init__(self, expedition: Expedition, expedition_dir: str | Path): + """Initialise ProblemSimulator with a schedule and probability level.""" + self.expedition = expedition + self.expedition_dir = Path(expedition_dir) + + def select_problems( + self, + instruments_in_expedition: set[InstrumentType], + prob_level: int, + ) -> dict[str, list[GeneralProblem | InstrumentProblem] | None]: + """ + Select problems (general and instrument-specific). When prob_level = 2, number of problems is determined by expedition length, instrument count etc. + + Map each selected problem to a random waypoint (or None if pre-departure). Finally, cache the suite of problems to a directory (expedition-specific via hash) for reference. + """ + valid_instrument_problems = [ + problem + for problem in INSTRUMENT_PROBLEM_REG + if problem.instrument_type in instruments_in_expedition + ] + + num_waypoints = len(self.expedition.schedule.waypoints) + num_instruments = len(instruments_in_expedition) + expedition_duration_days = ( + self.expedition.schedule.waypoints[-1].time + - self.expedition.schedule.waypoints[0].time + ).days + + if prob_level == 0: + num_problems = 0 + elif prob_level == 1: + num_problems = random.randint(1, 2) + elif prob_level == 2: + base = 1 + extra = ( # i.e. +1 problem for every n days/waypoints/instruments (tunable above) + (expedition_duration_days // PROBLEM_WEIGHTS["every_ndays"]) + + (num_waypoints // PROBLEM_WEIGHTS["every_nwaypoints"]) + + (num_instruments // PROBLEM_WEIGHTS["every_ninstruments"]) + ) + num_problems = base + extra + num_problems = min( + num_problems, len(GENERAL_PROBLEM_REG) + len(valid_instrument_problems) + ) + + selected_problems = [] + if num_problems > 0: + random.shuffle(GENERAL_PROBLEM_REG) + random.shuffle(valid_instrument_problems) + + # bias towards more instrument problems when there are more instruments + instrument_bias = min(0.7, num_instruments / (num_instruments + 2)) + n_instrument = round(num_problems * instrument_bias) + n_general = min(len(GENERAL_PROBLEM_REG), num_problems - n_instrument) + n_instrument = ( + num_problems - n_general + ) # recalc in case n_general was capped to len(GENERAL_PROBLEM_REG) + + selected_problems.extend(GENERAL_PROBLEM_REG[:n_general]) + selected_problems.extend(valid_instrument_problems[:n_instrument]) + + # allow only one pre-departure problem to occur; replace any extras with non-pre-departure problems + pre_departure_problems = [ + p + for p in selected_problems + if issubclass(p, GeneralProblem) and p.pre_departure + ] + if len(pre_departure_problems) > 1: + to_keep = random.choice(pre_departure_problems) + num_to_replace = len(pre_departure_problems) - 1 + # remove all but one pre_departure problem + selected_problems = [ + problem + for problem in selected_problems + if not ( + issubclass(problem, GeneralProblem) + and problem.pre_departure + and problem is not to_keep + ) + ] + # available non-pre_departure problems not already selected + available_general = [ + p + for p in GENERAL_PROBLEM_REG + if not p.pre_departure and p not in selected_problems + ] + available_instrument = [ + p for p in valid_instrument_problems if p not in selected_problems + ] + available_replacements = available_general + available_instrument + random.shuffle(available_replacements) + selected_problems.extend(available_replacements[:num_to_replace]) + + # map each problem to a [random] waypoint (or None if pre-departure) + waypoint_idxs = [] + for problem in selected_problems: + if getattr(problem, "pre_departure", False): + waypoint_idxs.append(None) + else: + # TODO: if incorporate departure and arrival port/waypoints in future, bear in mind index selection here may need to change + waypoint_idxs.append( + random.randint(0, len(self.expedition.schedule.waypoints) - 2) + ) # -1 to get index and -1 exclude last waypoint (would not impact any future scheduling as arrival in port is not part of schedule) + + # pair problems with their waypoint indices and sort by waypoint index (pre-departure first) + paired = sorted( + zip(selected_problems, waypoint_idxs, strict=True), + key=lambda x: (x[1] is not None, x[1] if x[1] is not None else -1), + ) + problems_sorted = { + "problem_class": [p for p, _ in paired], + "waypoint_i": [w for _, w in paired], + } + + return problems_sorted if selected_problems else None + + def cache_selected_problems( + self, + problems: dict[str, list[GeneralProblem | InstrumentProblem] | None], + selected_problems_fpath: str, + ) -> None: + """Cache suite of problems to json, for reference.""" + # make dir to contain problem jsons (unique to expedition) + os.makedirs(Path(selected_problems_fpath).parent, exist_ok=True) + + # cache dict of selected_problems to json + with open( + selected_problems_fpath, + "w", + encoding="utf-8", + ) as f: + json.dump( + { + "problem_class": [p.__name__ for p in problems["problem_class"]], + "waypoint_i": problems["waypoint_i"], + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + }, + f, + indent=4, + ) + + def execute( + self, + problems: dict[str, list[GeneralProblem | InstrumentProblem] | None], + instrument_type_validation: InstrumentType | None, + problems_dir: Path, + log_delay: float = 7.0, + ): + """ + Execute the selected problems, returning messaging and delay times. + + N.B. a problem_waypoint_i is different to a failed_waypoint_i defined in the Checkpoint class; failed_waypoint_i is the waypoint index after the problem_waypoint_i where the problem occurred, as this is when scheduling issues would be encountered. + """ + for problem, problem_waypoint_i in zip( + problems["problem_class"], problems["waypoint_i"], strict=True + ): + # skip if instrument problem but `p.instrument_type` does not match `instrument_type_validation` (i.e. the current instrument being simulated in the expedition, e.g. from _run.py) + if ( + issubclass(problem, InstrumentProblem) + and problem.instrument_type is not instrument_type_validation + ): + continue + + problem_hash = _make_hash(problem.message + str(problem_waypoint_i), 8) + hash_fpath = problems_dir.joinpath(f"problem_{problem_hash}.json") + if hash_fpath.exists(): + continue # problem * waypoint combination has already occurred; don't repeat + + if issubclass(problem, GeneralProblem) and problem.pre_departure: + alert_msg = LOG_MESSAGING["pre_departure"] + + else: + alert_msg = LOG_MESSAGING["during_expedition"].format( + waypoint=int(problem_waypoint_i) + 1 + ) + + # log problem occurrence, save to checkpoint, and pause simulation + self._log_problem( + problem, + problem_waypoint_i, + alert_msg, + problem_hash, + hash_fpath, + log_delay, + ) + + # cache original schedule for reference and/or restoring later if needed (checkpoint.yaml [written in _log_problem] can be overwritten if multiple problems occur so is not a persistent record of original schedule) + schedule_original_fpath = problems_dir / SCHEDULE_ORIGINAL + if not os.path.exists(schedule_original_fpath): + self._cache_original_schedule( + self.expedition.schedule, schedule_original_fpath + ) + + def load_selected_problems( + self, selected_problems_fpath: str + ) -> dict[str, list[GeneralProblem | InstrumentProblem] | None]: + """Load previously selected problem classes from json.""" + with open( + selected_problems_fpath, + encoding="utf-8", + ) as f: + problems_json = json.load(f) + + # extract selected problem classes from their names (using the lookups preserves order they were saved in) + selected_problems = {"problem_class": [], "waypoint_i": []} + general_problems_lookup = {cls.__name__: cls for cls in GENERAL_PROBLEM_REG} + instrument_problems_lookup = { + cls.__name__: cls for cls in INSTRUMENT_PROBLEM_REG + } + + for cls_name, wp_idx in zip( + problems_json["problem_class"], problems_json["waypoint_i"], strict=True + ): + if cls_name in general_problems_lookup: + selected_problems["problem_class"].append( + general_problems_lookup[cls_name] + ) + elif cls_name in instrument_problems_lookup: + selected_problems["problem_class"].append( + instrument_problems_lookup[cls_name] + ) + else: + raise ValueError( + f"Problem class '{cls_name}' not found in known problem registries." + ) + selected_problems["waypoint_i"].append(wp_idx) + + return selected_problems + + def _log_problem( + self, + problem: GeneralProblem | InstrumentProblem, + problem_waypoint_i: int | None, + alert_msg: str, + problem_hash: str, + hash_fpath: Path, + log_delay: float, + ): + """Log problem occurrence with spinner and delay, save to checkpoint, write hash.""" + time.sleep(3.0) # brief pause before spinner + with yaspin(text=alert_msg) as spinner: + time.sleep(log_delay) + spinner.ok("💥 ") + + print("\nPROBLEM ENCOUNTERED: " + problem.message + "\n") + + result_msg = "\nRESULT: " + LOG_MESSAGING["schedule_problems"].format( + delay_duration=problem.delay_duration.total_seconds() / 3600.0, + problem_wp=( + "in-port" + if problem_waypoint_i is None + else f"at waypoint {problem_waypoint_i + 1}" + ), + expedition_yaml=EXPEDITION, + ) + + self._hash_to_json( + problem, + problem_hash, + problem_waypoint_i, + hash_fpath, + ) + + # check if enough contingency time has been scheduled to avoid delay affecting future waypoints + with yaspin(text="Assessing impact on expedition schedule..."): + time.sleep(5.0) + + has_contingency = self._has_contingency(problem, problem_waypoint_i) + + if has_contingency: + print(LOG_MESSAGING["problem_avoided"]) + + # update problem json to resolved = True + with open(hash_fpath, encoding="utf-8") as f: + problem_json = json.load(f) + problem_json["resolved"] = True + with open(hash_fpath, "w", encoding="utf-8") as f_out: + json.dump(problem_json, f_out, indent=4) + + with yaspin(): # time to read message before simulation continues + time.sleep(7.0) + return + + else: + affected = ( + "in-port" + if problem_waypoint_i is None + else f"at waypoint {problem_waypoint_i + 1}" + ) + print( + f"\nNot enough contingency time scheduled to mitigate delay of {problem.delay_duration.total_seconds() / 3600.0} hours occuring {affected} (future waypoint(s) would be reached too late).\n" + ) + print(result_msg) + + # save checkpoint + checkpoint = Checkpoint( + past_schedule=self.expedition.schedule, + failed_waypoint_i=problem_waypoint_i + 1 + if problem_waypoint_i is not None + else 0, + ) # failed waypoint index then becomes the one after the one where the problem occurred; as this is when scheduling issues would be run into; for pre-departure problems this is the first waypoint + _save_checkpoint(checkpoint, self.expedition_dir) + + # pause simulation + sys.exit(0) + + def _has_contingency( + self, + problem: InstrumentProblem | GeneralProblem, + problem_waypoint_i: int | None, + ) -> bool: + """Determine if enough contingency time has been scheduled to avoid delay affecting the waypoint immediately after the problem.""" + if problem_waypoint_i is None: + return False # pre-departure problems always cause delay to first waypoint + + else: + curr_wp = self.expedition.schedule.waypoints[problem_waypoint_i] + next_wp = self.expedition.schedule.waypoints[problem_waypoint_i + 1] + + wp_stationkeeping_time = _calc_wp_stationkeeping_time( + curr_wp.instrument, self.expedition + ) + + scheduled_time_diff = next_wp.time - curr_wp.time + + sail_time = _calc_sail_time( + curr_wp.location, + next_wp.location, + ship_speed_knots=self.expedition.ship_config.ship_speed_knots, + projection=PROJECTION, + )[0] + + return ( + scheduled_time_diff + > sail_time + wp_stationkeeping_time + problem.delay_duration + ) + + def _make_checkpoint(self, failed_waypoint_i: int | None = None) -> Checkpoint: + """Make checkpoint, also handling pre-departure.""" + return Checkpoint( + past_schedule=self.expedition.schedule, failed_waypoint_i=failed_waypoint_i + ) + + def _hash_to_json( + self, + problem: InstrumentProblem | GeneralProblem, + problem_hash: str, + problem_waypoint_i: int | None, + hash_path: Path, + ) -> dict: + """Convert problem details + hash to json.""" + hash_data = { + "problem_hash": problem_hash, + "message": problem.message, + "problem_waypoint_i": problem_waypoint_i, + "delay_duration_hours": problem.delay_duration.total_seconds() / 3600.0, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "resolved": False, + } + with open(hash_path, "w", encoding="utf-8") as f: + json.dump(hash_data, f, indent=4) + + def _cache_original_schedule(self, schedule: Schedule, path: Path | str): + """Cache original schedule to file for reference, as a checkpoint object.""" + schedule_original = Checkpoint(past_schedule=schedule) + schedule_original.to_yaml(path) + print(f"\nOriginal schedule cached to {path}.\n") diff --git a/src/virtualship/models/__init__.py b/src/virtualship/models/__init__.py index d61c17194..7a106ba60 100644 --- a/src/virtualship/models/__init__.py +++ b/src/virtualship/models/__init__.py @@ -1,5 +1,6 @@ """Pydantic models and data classes used to configure virtualship (i.e., in the configuration files or settings).""" +from .checkpoint import Checkpoint from .expedition import ( ADCPConfig, ArgoFloatConfig, @@ -34,4 +35,5 @@ "Spacetime", "Expedition", "InstrumentsConfig", + "Checkpoint", ] diff --git a/src/virtualship/models/checkpoint.py b/src/virtualship/models/checkpoint.py index 98fe1ae0a..700c714f5 100644 --- a/src/virtualship/models/checkpoint.py +++ b/src/virtualship/models/checkpoint.py @@ -2,6 +2,8 @@ from __future__ import annotations +import json +from datetime import timedelta from pathlib import Path import pydantic @@ -9,7 +11,13 @@ from virtualship.errors import CheckpointError from virtualship.instruments.types import InstrumentType -from virtualship.models import Schedule +from virtualship.models.expedition import Expedition, Schedule +from virtualship.utils import ( + EXPEDITION, + PROJECTION, + _calc_sail_time, + _calc_wp_stationkeeping_time, +) class _YamlDumper(yaml.SafeDumper): @@ -29,6 +37,7 @@ class Checkpoint(pydantic.BaseModel): """ past_schedule: Schedule + failed_waypoint_i: int | None = None def to_yaml(self, file_path: str | Path) -> None: """ @@ -51,24 +60,122 @@ def from_yaml(cls, file_path: str | Path) -> Checkpoint: data = yaml.safe_load(file) return Checkpoint(**data) - def verify(self, schedule: Schedule) -> None: + def verify(self, expedition: Expedition, problems_dir: Path) -> None: """ - Verify that the given schedule matches the checkpoint's past schedule. - - This method checks if the waypoints in the given schedule match the waypoints - in the checkpoint's past schedule up to the length of the past schedule. - If there's a mismatch, it raises a CheckpointError. + Verify that the given schedule matches the checkpoint's past schedule , and/or that any problem has been resolved. - :param schedule: The schedule to verify against the checkpoint. - :type schedule: Schedule - :raises CheckpointError: If the past waypoints in the given schedule - have been changed compared to the checkpoint. - :return: None + Addresses changes made by the user in response to both i) scheduling issues arising for not enough time for the ship to travel between waypoints, and ii) problems encountered during simulation. """ - if ( - not schedule.waypoints[: len(self.past_schedule.waypoints)] - == self.past_schedule.waypoints + new_schedule = expedition.schedule + + # 1) check that past waypoints have not been changed, unless is a pre-departure problem + if self.failed_waypoint_i is None: + pass + elif ( + not new_schedule.waypoints[: int(self.failed_waypoint_i)] + == self.past_schedule.waypoints[: int(self.failed_waypoint_i)] ): raise CheckpointError( - "Past waypoints in schedule have been changed! Restore past schedule and only change future waypoints." + f"Past waypoints in schedule have been changed! Restore past schedule and only change future waypoints (waypoint {int(self.failed_waypoint_i) + 1} onwards)." ) + + # 2) check that problems have been resolved in the new schedule + hash_fpaths = [ + str(path.resolve()) for path in problems_dir.glob("problem_*.json") + ] + + if len(hash_fpaths) > 0: + for file in hash_fpaths: + with open(file, encoding="utf-8") as f: + problem = json.load(f) + if problem["resolved"]: + continue + elif not problem["resolved"]: + # check if delay has been accounted for in the new schedule (at waypoint immediately after problem waypoint; or first waypoint if pre-departure problem) + delay_duration = timedelta( + hours=float(problem["delay_duration_hours"]) + ) + + problem_waypoint = ( + new_schedule.waypoints[0] + if problem["problem_waypoint_i"] is None + else new_schedule.waypoints[problem["problem_waypoint_i"]] + ) + + # pre-departure problem: check that whole delay duration has been added to first waypoint time (by testing against past schedule) + if problem["problem_waypoint_i"] is None: + time_diff = ( + problem_waypoint.time - self.past_schedule.waypoints[0].time + ) + resolved = time_diff >= delay_duration + + # problem at a later waypoint: check new scheduled time exceeds sail time + delay duration + instrument deployment time (rather whole delay duration add-on, as there may be _some_ contingency time already scheduled) + else: + failed_waypoint = new_schedule.waypoints[self.failed_waypoint_i] + + scheduled_time = failed_waypoint.time - problem_waypoint.time + + stationkeeping_time = _calc_wp_stationkeeping_time( + problem_waypoint.instrument, + expedition, + ) # total time required to deploy instruments at problem waypoint + + sail_time = _calc_sail_time( + problem_waypoint.location, + failed_waypoint.location, + ship_speed_knots=expedition.ship_config.ship_speed_knots, + projection=PROJECTION, + )[0] + + min_time_required = ( + sail_time + delay_duration + stationkeeping_time + ) + + resolved = scheduled_time >= min_time_required + + if resolved: + print( + "\n\n🎉 Previous problem has been resolved in the schedule.\n" + ) + + # save back to json file changing the resolved status to True + problem["resolved"] = True + with open(file, "w", encoding="utf-8") as f_out: + json.dump(problem, f_out, indent=4) + + # only handle the first unresolved problem found; others will be handled in subsequent runs but are not yet known to the user + break + + else: + problem_wp_str = ( + "in-port" + if problem["problem_waypoint_i"] is None + else f"at waypoint {problem['problem_waypoint_i'] + 1}" + ) + affected_wp_str = ( + "1" + if problem["problem_waypoint_i"] is None + else f"{problem['problem_waypoint_i'] + 2}" + ) + time_elapsed = ( + (sail_time + delay_duration + stationkeeping_time) + if problem["problem_waypoint_i"] is not None + else delay_duration + ) + failed_waypoint_time = ( + failed_waypoint.time + if problem["problem_waypoint_i"] is not None + else new_schedule.waypoints[0].time + ) + current_time = problem_waypoint.time + time_elapsed + + raise CheckpointError( + f"The problem encountered in previous simulation has not been resolved in the schedule! Please adjust the schedule to account for delays caused by the problem (by using `virtualship plan` or directly editing the {EXPEDITION} file).\n\n" + f"The problem was associated with a delay duration of {problem['delay_duration_hours']} hours {problem_wp_str} (meaning waypoint {affected_wp_str} could not be reached in time). " + f"Currently, the ship would reach waypoint {affected_wp_str} at {current_time}, but the scheduled time is {failed_waypoint_time}." + + ( + f"\n\nHint: don't forget to factor in the time required to deploy the instruments {problem_wp_str} when rescheduling waypoint {affected_wp_str}." + if problem["problem_waypoint_i"] is not None + else "" + ) + ) diff --git a/src/virtualship/models/expedition.py b/src/virtualship/models/expedition.py index b8f65558f..f9c75ee79 100644 --- a/src/virtualship/models/expedition.py +++ b/src/virtualship/models/expedition.py @@ -12,9 +12,12 @@ from virtualship.errors import InstrumentsConfigError, ScheduleError from virtualship.instruments.types import InstrumentType from virtualship.utils import ( + _calc_sail_time, _get_bathy_data, _get_waypoint_latlons, + _make_hash, _validate_numeric_to_timedelta, + register_instrument_config, ) from .location import Location @@ -65,6 +68,14 @@ def get_instruments(self) -> set[InstrumentType]: "Underway instrument config attribute(s) are missing from YAML. Must be Config object or None." ) from e + def get_unique_identifier(self) -> str: + """Generate a unique hash for the expedition based waypoints locations and instrument types. Therefore, any changes to location, number of waypoints or instrument types will change the hash.""" + waypoint_data = "".join( + f"{wp.location.lat},{wp.location.lon};{wp.instrument}" + for wp in self.schedule.waypoints + ) + return _make_hash(waypoint_data, length=12) + class ShipConfig(pydantic.BaseModel): """Configuration of the ship.""" @@ -165,23 +176,22 @@ def verify( if wp.instrument is InstrumentType.CTD: time += timedelta(minutes=20) - geodinv: tuple[float, float, float] = projection.inv( - wp.location.lon, - wp.location.lat, - wp_next.location.lon, - wp_next.location.lat, - ) - distance = geodinv[2] + time_to_reach = _calc_sail_time( + wp.location, + wp_next.location, + ship_speed, + projection, + )[0] - time_to_reach = timedelta(seconds=distance / ship_speed * 3600 / 1852) arrival_time = time + time_to_reach if wp_next.time is None: time = arrival_time elif arrival_time > wp_next.time: raise ScheduleError( - f"Waypoint planning is not valid: would arrive too late at waypoint number {wp_i + 2}. " - f"location: {wp_next.location} time: {wp_next.time} instrument: {wp_next.instrument}" + f"Waypoint planning is not valid: would arrive too late at waypoint {wp_i + 2}. " + f"Location: {wp_next.location} Time: {wp_next.time}. " + f"Currently projected to arrive at: {arrival_time}." ) else: time = wp_next.time @@ -204,6 +214,7 @@ def serialize_instrument(self, instrument): return instrument.value if instrument else None +@register_instrument_config(InstrumentType.ARGO_FLOAT) class ArgoFloatConfig(pydantic.BaseModel): """Configuration for argos floats.""" @@ -244,6 +255,7 @@ def _validate_stationkeeping_time(cls, value: int | float | timedelta) -> timede model_config = pydantic.ConfigDict(populate_by_name=True) +@register_instrument_config(InstrumentType.ADCP) class ADCPConfig(pydantic.BaseModel): """Configuration for ADCP instrument.""" @@ -266,6 +278,7 @@ def _validate_period(cls, value: int | float | timedelta) -> timedelta: return _validate_numeric_to_timedelta(value, "minutes") +@register_instrument_config(InstrumentType.CTD) class CTDConfig(pydantic.BaseModel): """Configuration for CTD instrument.""" @@ -288,6 +301,7 @@ def _validate_stationkeeping_time(cls, value: int | float | timedelta) -> timede return _validate_numeric_to_timedelta(value, "minutes") +@register_instrument_config(InstrumentType.CTD_BGC) class CTD_BGCConfig(pydantic.BaseModel): """Configuration for CTD_BGC instrument.""" @@ -310,6 +324,7 @@ def _validate_stationkeeping_time(cls, value: int | float | timedelta) -> timede return _validate_numeric_to_timedelta(value, "minutes") +@register_instrument_config(InstrumentType.UNDERWATER_ST) class ShipUnderwaterSTConfig(pydantic.BaseModel): """Configuration for underwater ST.""" @@ -330,6 +345,7 @@ def _validate_period(cls, value: int | float | timedelta) -> timedelta: return _validate_numeric_to_timedelta(value, "minutes") +@register_instrument_config(InstrumentType.DRIFTER) class DrifterConfig(pydantic.BaseModel): """Configuration for drifters.""" @@ -364,6 +380,7 @@ def _validate_stationkeeping_time(cls, value: int | float | timedelta) -> timede return _validate_numeric_to_timedelta(value, "minutes") +@register_instrument_config(InstrumentType.XBT) class XBTConfig(pydantic.BaseModel): """Configuration for xbt instrument.""" diff --git a/src/virtualship/utils.py b/src/virtualship/utils.py index 2879855e4..20b48797e 100644 --- a/src/virtualship/utils.py +++ b/src/virtualship/utils.py @@ -1,6 +1,7 @@ from __future__ import annotations import glob +import hashlib import os import re import warnings @@ -12,23 +13,137 @@ import copernicusmarine import numpy as np +import pyproj import xarray as xr from parcels import FieldSet from virtualship.errors import CopernicusCatalogueError +from virtualship.instruments.types import InstrumentType if TYPE_CHECKING: - from virtualship.expedition.simulate_schedule import ScheduleOk - from virtualship.models import Expedition - + from virtualship.expedition.simulate_schedule import ( + ScheduleOk, + ) + from virtualship.models import Expedition, Location + from virtualship.models.checkpoint import Checkpoint import pandas as pd import yaml from pydantic import BaseModel from yaspin import Spinner +# ===================================================== +# SECTION: simulation constants +# ===================================================== + EXPEDITION = "expedition.yaml" CHECKPOINT = "checkpoint.yaml" +SCHEDULE_ORIGINAL = "schedule_original.yaml" + +PROBLEMS_ENCOUNTERED_DIR = "problems_encountered_" + "{expedition_identifier}" +SELECTED_PROBLEMS = "selected_problems.json" + +# projection used to sail between waypoints +PROJECTION = pyproj.Geod(ellps="WGS84") + + +# ===================================================== +# SECTION: Copernicus Marine Service constants +# ===================================================== + +# Copernicus Marine product IDs + +PRODUCT_IDS = { + "phys": { + "reanalysis": "cmems_mod_glo_phy_my_0.083deg_P1D-m", + "reanalysis_interim": "cmems_mod_glo_phy_myint_0.083deg_P1D-m", + "analysis": "cmems_mod_glo_phy_anfc_0.083deg_P1D-m", + }, + "bgc": { + "reanalysis": "cmems_mod_glo_bgc_my_0.25deg_P1D-m", + "reanalysis_interim": "cmems_mod_glo_bgc_myint_0.25deg_P1D-m", + "analysis": None, # will be set per variable + }, +} + +BGC_ANALYSIS_IDS = { + "o2": "cmems_mod_glo_bgc-bio_anfc_0.25deg_P1D-m", + "chl": "cmems_mod_glo_bgc-pft_anfc_0.25deg_P1D-m", + "no3": "cmems_mod_glo_bgc-nut_anfc_0.25deg_P1D-m", + "po4": "cmems_mod_glo_bgc-nut_anfc_0.25deg_P1D-m", + "ph": "cmems_mod_glo_bgc-car_anfc_0.25deg_P1D-m", + "phyc": "cmems_mod_glo_bgc-pft_anfc_0.25deg_P1D-m", + "nppv": "cmems_mod_glo_bgc-bio_anfc_0.25deg_P1D-m", +} + +MONTHLY_BGC_REANALYSIS_IDS = { + "ph": "cmems_mod_glo_bgc_my_0.25deg_P1M-m", + "phyc": "cmems_mod_glo_bgc_my_0.25deg_P1M-m", +} +MONTHLY_BGC_REANALYSIS_INTERIM_IDS = { + "ph": "cmems_mod_glo_bgc_myint_0.25deg_P1M-m", + "phyc": "cmems_mod_glo_bgc_myint_0.25deg_P1M-m", +} + +# variables used in VirtualShip which are physical or biogeochemical variables, respectively +COPERNICUSMARINE_PHYS_VARIABLES = ["uo", "vo", "so", "thetao"] +COPERNICUSMARINE_BGC_VARIABLES = ["o2", "chl", "no3", "po4", "ph", "phyc", "nppv"] + +BATHYMETRY_ID = "cmems_mod_glo_phy_my_0.083deg_static" + + +# ===================================================== +# SECTION: decorators / dynamic registries and mapping +# ===================================================== + +# helpful for dynamic access in different parts of the codebase + +# main instrument (simulation) class registry and registration utilities +INSTRUMENT_CLASS_MAP = {} + + +def register_instrument(instrument_type): + def decorator(cls): + INSTRUMENT_CLASS_MAP[instrument_type] = cls + return cls + + return decorator + + +def get_instrument_class(instrument_type): + return INSTRUMENT_CLASS_MAP.get(instrument_type) + + +# problems inventory registry and registration utilities +INSTRUMENT_PROBLEM_REG = [] +GENERAL_PROBLEM_REG = [] + + +def register_instrument_problem(cls): + INSTRUMENT_PROBLEM_REG.append(cls) + return cls + + +def register_general_problem(cls): + GENERAL_PROBLEM_REG.append(cls) + return cls + + +# map for instrument type to instrument config (pydantic basemodel) names +INSTRUMENT_CONFIG_MAP = {} + + +def register_instrument_config(instrument_type): + def decorator(cls): + INSTRUMENT_CONFIG_MAP[instrument_type] = cls.__name__ + return cls + + return decorator + + +# ===================================================== +# SECTION: helper functions +# ===================================================== def load_static_file(name: str) -> str: @@ -215,40 +330,6 @@ def _get_expedition(expedition_dir: Path) -> Expedition: ) from e -# custom ship spinner -ship_spinner = Spinner( - interval=240, - frames=[ - " 🚢 ", - " 🚢 ", - " 🚢 ", - " 🚢 ", - " 🚢", - " 🚢 ", - " 🚢 ", - " 🚢 ", - " 🚢 ", - "🚢 ", - ], -) - - -# InstrumentType -> Instrument registry and registration utilities. -INSTRUMENT_CLASS_MAP = {} - - -def register_instrument(instrument_type): - def decorator(cls): - INSTRUMENT_CLASS_MAP[instrument_type] = cls - return cls - - return decorator - - -def get_instrument_class(instrument_type): - return INSTRUMENT_CLASS_MAP.get(instrument_type) - - def add_dummy_UV(fieldset: FieldSet): """Add a dummy U and V field to a FieldSet to satisfy parcels FieldSet completeness checks.""" if "U" not in fieldset.__dict__.keys(): @@ -272,47 +353,6 @@ def add_dummy_UV(fieldset: FieldSet): ) from None -# Copernicus Marine product IDs - -PRODUCT_IDS = { - "phys": { - "reanalysis": "cmems_mod_glo_phy_my_0.083deg_P1D-m", - "reanalysis_interim": "cmems_mod_glo_phy_myint_0.083deg_P1D-m", - "analysis": "cmems_mod_glo_phy_anfc_0.083deg_P1D-m", - }, - "bgc": { - "reanalysis": "cmems_mod_glo_bgc_my_0.25deg_P1D-m", - "reanalysis_interim": "cmems_mod_glo_bgc_myint_0.25deg_P1D-m", - "analysis": None, # will be set per variable - }, -} - -BGC_ANALYSIS_IDS = { - "o2": "cmems_mod_glo_bgc-bio_anfc_0.25deg_P1D-m", - "chl": "cmems_mod_glo_bgc-pft_anfc_0.25deg_P1D-m", - "no3": "cmems_mod_glo_bgc-nut_anfc_0.25deg_P1D-m", - "po4": "cmems_mod_glo_bgc-nut_anfc_0.25deg_P1D-m", - "ph": "cmems_mod_glo_bgc-car_anfc_0.25deg_P1D-m", - "phyc": "cmems_mod_glo_bgc-pft_anfc_0.25deg_P1D-m", - "nppv": "cmems_mod_glo_bgc-bio_anfc_0.25deg_P1D-m", -} - -MONTHLY_BGC_REANALYSIS_IDS = { - "ph": "cmems_mod_glo_bgc_my_0.25deg_P1M-m", - "phyc": "cmems_mod_glo_bgc_my_0.25deg_P1M-m", -} -MONTHLY_BGC_REANALYSIS_INTERIM_IDS = { - "ph": "cmems_mod_glo_bgc_myint_0.25deg_P1M-m", - "phyc": "cmems_mod_glo_bgc_myint_0.25deg_P1M-m", -} - -# variables used in VirtualShip which are physical or biogeochemical variables, respectively -COPERNICUSMARINE_PHYS_VARIABLES = ["uo", "vo", "so", "thetao"] -COPERNICUSMARINE_BGC_VARIABLES = ["o2", "chl", "no3", "po4", "ph", "phyc", "nppv"] - -BATHYMETRY_ID = "cmems_mod_glo_phy_my_0.083deg_static" - - def _select_product_id( physical: bool, schedule_start, @@ -552,3 +592,101 @@ def _get_waypoint_latlons(waypoints): strict=True, ) return wp_lats, wp_lons + + +def _save_checkpoint(checkpoint: Checkpoint, expedition_dir: Path) -> None: + file_path = expedition_dir.joinpath(CHECKPOINT) + checkpoint.to_yaml(file_path) + + +def _calc_sail_time( + location1: Location, + location2: Location, + ship_speed_knots: float, + projection: pyproj.Geod, +) -> tuple[timedelta, tuple[float, float, float], float]: + """Calculate sail time between two waypoints (their locations) given ship speed in knots.""" + geodinv: tuple[float, float, float] = projection.inv( + lons1=location1.longitude, + lats1=location1.latitude, + lons2=location2.longitude, + lats2=location2.latitude, + ) + ship_speed_meter_per_second = ship_speed_knots * 1852 / 3600 + distance_to_next_waypoint = geodinv[2] + return ( + timedelta(seconds=distance_to_next_waypoint / ship_speed_meter_per_second), + geodinv[0], + ship_speed_meter_per_second, + ) + + +def _calc_wp_stationkeeping_time( + wp_instrument_types: list, + expedition: Expedition, + instrument_config_map: dict = INSTRUMENT_CONFIG_MAP, +) -> timedelta: + """For a given waypoint, calculate how much time is required to carry out all instrument deployments.""" + # TODO: this can be removed if/when CTD and CTD_BGC are merged to a single instrument + both_ctd_and_bgc = ( + InstrumentType.CTD in wp_instrument_types + and InstrumentType.CTD_BGC in wp_instrument_types + ) + + # extract configs for all instruments present in expedition + valid_instrument_configs = [ + iconfig + for _, iconfig in expedition.instruments_config.__dict__.items() + if iconfig + ] + + # extract configs for instruments present in given waypoint + wp_instrument_configs = [] + for iconfig in valid_instrument_configs: + for itype in wp_instrument_types: + if instrument_config_map[itype] == iconfig.__class__.__name__: + wp_instrument_configs.append(iconfig) + + # get wp total stationkeeping time + cumulative_stationkeeping_time = timedelta() + for iconfig in wp_instrument_configs: + if ( + both_ctd_and_bgc + and iconfig.__class__.__name__ + == INSTRUMENT_CONFIG_MAP[InstrumentType.CTD_BGC] + ): + continue # only need to add time cost once if both CTD and CTD_BGC are being taken; in reality they would be done on the same instrument + if hasattr(iconfig, "stationkeeping_time"): + cumulative_stationkeeping_time += iconfig.stationkeeping_time + + return cumulative_stationkeeping_time + + +def _make_hash(s: str, length: int) -> str: + """Make unique hash for problem occurrence.""" + assert length % 2 == 0, "Length must be even." + half_length = length // 2 + return hashlib.shake_128(s.encode("utf-8")).hexdigest(half_length) + + +# ===================================================== +# SECTION: misc. +# ===================================================== + + +# custom ship spinner +ship_spinner = Spinner( + interval=240, + frames=[ + " 🚢 ", + " 🚢 ", + " 🚢 ", + " 🚢 ", + " 🚢", + " 🚢 ", + " 🚢 ", + " 🚢 ", + " 🚢 ", + "🚢 ", + ], +) diff --git a/tests/cli/test_run.py b/tests/cli/test_run.py index 190442347..d546cae8c 100644 --- a/tests/cli/test_run.py +++ b/tests/cli/test_run.py @@ -53,7 +53,9 @@ def test_run(tmp_path, monkeypatch): fake_data_dir = tmp_path / "fake_data" fake_data_dir.mkdir() - _run(expedition_dir, from_data=fake_data_dir) + _run( + expedition_dir, prob_level=0, from_data=fake_data_dir + ) # problems turned off here results_dir = expedition_dir / "results" diff --git a/tests/expedition/test_expedition.py b/tests/expedition/test_expedition.py index 90027e8ec..314b9db89 100644 --- a/tests/expedition/test_expedition.py +++ b/tests/expedition/test_expedition.py @@ -199,7 +199,7 @@ def test_verify_on_land(): ] ), ScheduleError, - "Waypoint planning is not valid: would arrive too late at waypoint number 2...", + "Waypoint planning is not valid: would arrive too late at waypoint 2\\.", id="NotEnoughTime", ), ],