From 167d6a8fae9f3975eff36409bf0b7f99b329405b Mon Sep 17 00:00:00 2001 From: Louis-Philippe Rousseau Lambert Date: Fri, 28 Nov 2025 15:56:02 +0000 Subject: [PATCH] v1 of new alerts from DMS fix for empty alerts added a local file for the alerts updated mapping for query --- deploy/default/msc-pygeoapi-config.yml | 35 ++ msc_pygeoapi/loader/__init__.py | 6 +- msc_pygeoapi/loader/alerts_realtime.py | 509 +++++++++++++++++++++++++ msc_pygeoapi/plugin.py | 19 +- 4 files changed, 562 insertions(+), 7 deletions(-) create mode 100644 msc_pygeoapi/loader/alerts_realtime.py diff --git a/deploy/default/msc-pygeoapi-config.yml b/deploy/default/msc-pygeoapi-config.yml index 045b5742..348a1237 100644 --- a/deploy/default/msc-pygeoapi-config.yml +++ b/deploy/default/msc-pygeoapi-config.yml @@ -4530,6 +4530,41 @@ resources: data: ${MSC_PYGEOAPI_ES_URL}/coastal_flood_risk_index id_field: id + weather-alerts: + type: collection + title: + en: Weather alerts + fr: Alertes météo + description: + en: "Environment Canada issues weather alerts about weather related hazards in order to notify those in affected areas so that they can take steps to protect themselves and their property from harm. Alerts are classified depending on the severity and timing of the subject event and include: warnings, watches, advisories and statements. Warnings are usually issued six to 24 hours in advance, although some severe weather (such as thunderstorms and tornadoes) can occur rapidly, with less than a half hours' notice." + fr: "Environnement Canada publie des alertes météo lorsque le temps est menaçant pour informer les personnes se trouvant dans les zones touchées afin qu'elles puissent prendre des mesures pour se protéger et protéger leurs biens. Le type d'alerte utilisé dépend de la gravité et du moment de l'événement et inclut : les avertissements, les veilles, les avis et les bulletins. Les avertissements sont habituellement émis entre 6 et 24 heures à l'avance, même si certains phénomènes violents (par exemple les orages et les tornades) peuvent se produire rapidement, avec un avis de moins d'une demi-heure." + keywords: + en: [Weather warnings, Precipitation, Snow, Wind, Storms, Floods] + fr: [Alerte météorologique, Précipitation, Neige, Vent, Tempête, Inondation] + crs: + - CRS84 + links: + - type: text/html + rel: canonical + title: + en: Meteorological Service of Canada open data + fr: Données ouvertes du Service météorologique du Canada + href: + en: https://eccc-msc.github.io/open-data/msc-data/readme_en + fr: https://eccc-msc.github.io/open-data/msc-data/readme_fr + hreflang: + en: en-CA + fr: fr-CA + extents: + spatial: + bbox: [-145.27, 37.3, -48.11, 87.61] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + providers: + - type: feature + name: Elasticsearch + data: ${MSC_PYGEOAPI_ES_URL}/alerts-realtime + id_field: id + raster-drill: type: process processor: diff --git a/msc_pygeoapi/loader/__init__.py b/msc_pygeoapi/loader/__init__.py index 84f60260..e9a2fde2 100644 --- a/msc_pygeoapi/loader/__init__.py +++ b/msc_pygeoapi/loader/__init__.py @@ -2,9 +2,12 @@ # # Author: Tom Kralidis # Felix Laframboise +# Louis-Philippe Rousseau-Lambert +# # -# Copyright (c) 2023 Tom Kralidis # Copyright (c) 2021 Felix Laframboise +# Copyright (c) 2023 Tom Kralidis +# Copyright (c) 2025 Louis-Philippe Rousseau-Lambert # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -61,6 +64,7 @@ def metadata(): ('msc_pygeoapi.loader.forecast_polygons', 'forecast_polygons'), ('msc_pygeoapi.loader.marine_weather_realtime', 'marine_weather'), ('msc_pygeoapi.loader.cap_alerts_realtime', 'cap_alerts'), + ('msc_pygeoapi.loader.alerts_realtime', 'alerts_realtime'), ('msc_pygeoapi.loader.swob_realtime', 'swob_realtime'), ('msc_pygeoapi.loader.aqhi_realtime', 'aqhi_realtime'), ('msc_pygeoapi.loader.aqhi_stations', 'aqhi_stations'), diff --git a/msc_pygeoapi/loader/alerts_realtime.py b/msc_pygeoapi/loader/alerts_realtime.py new file mode 100644 index 00000000..02f06b0e --- /dev/null +++ b/msc_pygeoapi/loader/alerts_realtime.py @@ -0,0 +1,509 @@ +# ================================================================= +# +# Author: Louis-Philippe Rousseau-Lambert +# +# +# Copyright (c) 2025 Louis-Philippe Rousseau-Lambert +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +from datetime import datetime +import json +import logging +import os +from pathlib import Path + +import click +from parse import parse + +from msc_pygeoapi import cli_options +from msc_pygeoapi.connector.elasticsearch_ import ElasticsearchConnector +from msc_pygeoapi.env import GEOMET_LOCAL_BASEPATH +from msc_pygeoapi.loader.base import BaseLoader +from msc_pygeoapi.util import ( + configure_es_connection, + check_es_indexes_to_delete, +) + +LOGGER = logging.getLogger(__name__) + +# cleanup settings +DAYS_TO_KEEP = 7 + +# index settings +INDEX_BASENAME = 'alerts-realtime.' + +ALIAS = 'alerts-realtime' + +MAPPINGS = { + 'properties': { + 'geometry': {'type': 'geo_shape'}, + 'properties': { + 'properties': { + 'id': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'publication_datetime': { + 'type': 'date', + 'format': 'strict_date_time' + }, + 'expiration_datetime': { + 'type': 'date', + 'format': 'strict_date_time' + }, + 'validity_datetime': { + 'type': 'date', + 'format': 'strict_date_time' + }, + 'event_end_datetime': { + 'type': 'date', + 'format': 'strict_date_time' + }, + 'alert_code': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'alert_type': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'alert_name_en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'alert_name_fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'alert_short_name_en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'alert_short_name_fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'alert_text_en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'alert_text_fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'risk_colour_en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'risk_colour_fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'confidence_en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'confidence_fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'impact_en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'impact_fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'feature_name_en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'feature_name_fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'province': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'status_en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'status_fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'feature_id': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + } + } + } + } +} + +SETTINGS = { + 'order': 0, + 'version': 1, + 'index_patterns': [f'{INDEX_BASENAME}*'], + 'settings': {'number_of_shards': 1, 'number_of_replicas': 0}, + 'mappings': None +} + + +class AlertsRealtimeLoader(BaseLoader): + """Alerts Real-time loader""" + + def __init__(self, conn_config={}): + """initializer""" + + BaseLoader.__init__(self) + + self.conn = ElasticsearchConnector(conn_config) + self.filepath = None + self.date_ = None + self.index_date = None + self.items = [] + + def parse_filename(self, filename): + """ + Parses an alerts filename in order to get the date + + :return: `bool` of parse status + """ + + # parse filepath + # example 20251126T182051.607Z_MSC_Alerts.json:DMS:CMC:ALERTS:JSON:20251126182137 # noqa + pattern = '{date_}_MSC_Alerts.json{_}' # noqa + parsed_filename = parse(pattern, filename) + + self.date_ = datetime.strptime( + parsed_filename.named['date_'], '%Y%m%dT%H%M%S.%fZ' + ) + self.index_date = datetime.strftime(self.date_, '%Y-%m-%dt%H%M%S.%fz') + + return True + + def swap_alias(self, index_name): + """ + Swap aliases to point to the new alerts index + + :return: `bool` of parse status + """ + + self.conn.create_alias(ALIAS, index_name, overwrite=True) + + return True + + def delete_indices(self, indices): + for idx in indices: + self.conn.delete(idx) + + return True + + def generate_geojson_features(self, es_index): + """ + Generates and yields a series of umos. + Umos are returned as Elasticsearch bulk API + upsert actions,with documents in GeoJSON to match the Elasticsearch + index mappings. + + :returns: Generator of Elasticsearch actions to upsert the alerts + """ + + with open(self.filepath.resolve()) as f: + data = json.load(f) + # writting latest alerts file locally + # this will be used by geomet-weather + # to avoid the 500 hit limit per minute + alerts_local_path = os.path.join(GEOMET_LOCAL_BASEPATH, + 'alerts', + 'dms-alerts.json') + LOGGER.debug(f'new local file: {alerts_local_path}') + with open(alerts_local_path, 'w', encoding='utf-8') as alerts: + alerts.write(json.dumps(data)) + + features = data['features'] + + for feature in features: + prop_id = feature['properties']['id'] + feat_id = feature['properties']['feature_id'] + feature['id'] = f'{prop_id}_{feat_id}' + feature['properties']['id'] = feature['id'] + + self.items.append(feature) + + action = { + '_id': feature['id'], + '_index': es_index, + '_op_type': 'update', + 'doc': feature, + 'doc_as_upsert': True + } + + yield action + + def load_data(self, filepath): + """ + loads data from event to target + :returns: `bool` of status result + """ + + self.filepath = Path(filepath) + filename = self.filepath.name + + # set class variables from filename + LOGGER.debug(f'Received file {self.filepath}') + self.parse_filename(filename) + + # set new index name + es_index = f'{INDEX_BASENAME}{self.index_date}' + LOGGER.debug(f'new index name: {es_index}') + + # Check if alias already exists + LOGGER.debug(f'Checking if {self.filepath} is the most recent file') + is_more_recent = False + + # using "or []" to avoid having current_indices = None + current_indices = (self.conn.get_alias_indices(ALIAS)) or [] + LOGGER.debug(f'Current indices {current_indices}') + + is_more_recent = all( + self.date_ > datetime.strptime(".".join(idx.split(".")[1:]), + "%Y-%m-%dt%H%M%S.%fz") + for idx in current_indices + ) + LOGGER.debug(f'Is new file more recent --> {is_more_recent}') + + if is_more_recent: + LOGGER.debug(f'{self.filepath} is the most recent file') + SETTINGS['index_patterns'] = [f'{INDEX_BASENAME}*'] + SETTINGS['mappings'] = MAPPINGS + self.conn.create_template(INDEX_BASENAME, SETTINGS) + + # create index + # necessary for empty alerts json + self.conn.create(es_index, {'mappings': MAPPINGS}) + + # generate geojson features + package = self.generate_geojson_features(es_index) + self.conn.submit_elastic_package(package, request_size=80000) + + # Swap alias + LOGGER.debug(f'Swapping alias: {es_index}') + self.swap_alias(es_index) + + # Delete old indices + LOGGER.debug(f'Deleting previous indexes: {current_indices}') + self.delete_indices(current_indices) + + return True + + +@click.group() +def alerts_realtime(): + """Manages alerts indexes""" + pass + + +@click.command() +@click.pass_context +@cli_options.OPTION_FILE() +@cli_options.OPTION_DIRECTORY() +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +def add(ctx, file_, directory, es, username, password, ignore_certs): + """Add alerts data to Elasticsearch""" + + if all([file_ is None, directory is None]): + raise click.ClickException('Missing --file/-f or --dir/-d option') + + conn_config = configure_es_connection(es, username, password, ignore_certs) + + files_to_process = [] + + if file_ is not None: + files_to_process = [file_] + elif directory is not None: + for root, dirs, files in os.walk(directory): + for f in [file for file in files if file.endswith('.json')]: + files_to_process.append(os.path.join(root, f)) + files_to_process.sort(key=os.path.getmtime) + + for file_to_process in files_to_process: + loader = AlertsRealtimeLoader(conn_config) + result = loader.load_data(file_to_process) + if not result: + click.echo('features not generated') + + +@click.command() +@click.pass_context +@cli_options.OPTION_DAYS( + default=DAYS_TO_KEEP, + help=f'Delete indexes older than n days (default={DAYS_TO_KEEP})', +) +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +@cli_options.OPTION_YES(prompt='Are you sure you want to delete old indexes?') +def clean_indexes(ctx, days, es, username, password, ignore_certs): + """Delete old alerts realtime indexes older than n days""" + + conn_config = configure_es_connection(es, username, password, ignore_certs) + conn = ElasticsearchConnector(conn_config) + + indexes_to_fetch = '{INDEX_BASENAME}.*' + + indexes = conn.get(indexes_to_fetch) + + if indexes: + indexes_to_delete = check_es_indexes_to_delete(indexes, days) + if indexes_to_delete: + click.echo(f'Deleting indexes {indexes_to_delete}') + conn.delete(','.join(indexes_to_delete)) + + click.echo('Done') + + +@click.command() +@click.pass_context +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +@cli_options.OPTION_INDEX_TEMPLATE() +def delete_indexes(ctx, es, username, password, ignore_certs, + index_template): + """Delete all alerts realtime indexes""" + + conn_config = configure_es_connection(es, username, password, ignore_certs) + conn = ElasticsearchConnector(conn_config) + + indexes = f'{INDEX_BASENAME}*' + + click.echo(f'Deleting indexes {indexes}') + conn.delete(indexes) + + if index_template: + index_name = indexes + click.echo(f'Deleting index template {index_name}') + conn.delete_template(index_name) + + click.echo('Done') + + +alerts_realtime.add_command(add) +alerts_realtime.add_command(clean_indexes) +alerts_realtime.add_command(delete_indexes) diff --git a/msc_pygeoapi/plugin.py b/msc_pygeoapi/plugin.py index c6e68148..bf4d1198 100644 --- a/msc_pygeoapi/plugin.py +++ b/msc_pygeoapi/plugin.py @@ -2,9 +2,12 @@ # # Author: Tom Kralidis # Felix Laframboise +# Louis-Philippe Rousseau-Lambert +# # -# Copyright (c) 2023 Tom Kralidis # Copyright (c) 2021 Felix Laframboise +# Copyright (c) 2023 Tom Kralidis +# Copyright (c) 2025 Louis-Philippe Rousseau-Lambert # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -50,19 +53,23 @@ }, 'hurricanes_realtime': { 'filename_pattern': 'hurricanes/', - 'handler': 'msc_pygeoapi.loader.hurricanes_realtime.HurricanesRealtimeLoader' # noqa + 'handler': 'msc_pygeoapi.loader.hurricanes_realtime.HurricanesRealtimeLoader' # noqa }, 'forecast_polygons': { 'filename_pattern': 'meteocode/geodata/', - 'handler': 'msc_pygeoapi.loader.forecast_polygons.ForecastPolygonsLoader' # noqa + 'handler': 'msc_pygeoapi.loader.forecast_polygons.ForecastPolygonsLoader' # noqa }, 'marine_weather_realtime': { 'filename_pattern': 'marine_weather/xml/', - 'handler': 'msc_pygeoapi.loader.marine_weather_realtime.MarineWeatherRealtimeLoader' # noqa + 'handler': 'msc_pygeoapi.loader.marine_weather_realtime.MarineWeatherRealtimeLoader' # noqa }, 'cap_alerts_realtime': { 'filename_pattern': 'alerts/cap', - 'handler': 'msc_pygeoapi.loader.cap_alerts_realtime.CapAlertsRealtimeLoader' # noqa + 'handler': 'msc_pygeoapi.loader.cap_alerts_realtime.CapAlertsRealtimeLoader' # noqa + }, + 'alerts_realtime': { + 'filename_pattern': 'MSC-DMS-OP/GEOMET/ALERT', + 'handler': 'msc_pygeoapi.loader.alerts_realtime.AlertsRealtimeLoader' # noqa }, 'swob_realtime': { 'filename_pattern': 'observations/swob-ml', @@ -78,7 +85,7 @@ }, 'cumulative_effects_hs': { 'filename_pattern': 'model_raqdps-fw/cumulative_effects/json', - 'handler': 'msc_pygeoapi.loader.cumulative_effects_hs.CumulativeEffectsHSLoader' # noqa + 'handler': 'msc_pygeoapi.loader.cumulative_effects_hs.CumulativeEffectsHSLoader' # noqa }, 'umos_realtime': { 'filename_pattern': 'stat-post-processing',