Skip to content

Commit 2a68d83

Browse files
author
Louis-Philippe Rousseau Lambert
committed
v1 of new alerts from DMS
fix for empty alerts
1 parent 3c6f7fe commit 2a68d83

File tree

4 files changed

+383
-7
lines changed

4 files changed

+383
-7
lines changed

deploy/default/msc-pygeoapi-config.yml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4530,6 +4530,41 @@ resources:
45304530
data: ${MSC_PYGEOAPI_ES_URL}/coastal_flood_risk_index
45314531
id_field: id
45324532

4533+
weather-alerts:
4534+
type: collection
4535+
title:
4536+
en: Weather alerts
4537+
fr: Alertes météo
4538+
description:
4539+
en: "Environment Canada issues weather alerts about weather related hazards in order to notify those in affected areas so that they can take steps to protect themselves and their property from harm. Alerts are classified depending on the severity and timing of the subject event and include: warnings, watches, advisories and statements. Warnings are usually issued six to 24 hours in advance, although some severe weather (such as thunderstorms and tornadoes) can occur rapidly, with less than a half hours' notice."
4540+
fr: "Environnement Canada publie des alertes météo lorsque le temps est menaçant pour informer les personnes se trouvant dans les zones touchées afin qu'elles puissent prendre des mesures pour se protéger et protéger leurs biens. Le type d'alerte utilisé dépend de la gravité et du moment de l'événement et inclut : les avertissements, les veilles, les avis et les bulletins. Les avertissements sont habituellement émis entre 6 et 24 heures à l'avance, même si certains phénomènes violents (par exemple les orages et les tornades) peuvent se produire rapidement, avec un avis de moins d'une demi-heure."
4541+
keywords:
4542+
en: [Weather warnings, Precipitation, Snow, Wind, Storms, Floods]
4543+
fr: [Alerte météorologique, Précipitation, Neige, Vent, Tempête, Inondation]
4544+
crs:
4545+
- CRS84
4546+
links:
4547+
- type: text/html
4548+
rel: canonical
4549+
title:
4550+
en: Meteorological Service of Canada open data
4551+
fr: Données ouvertes du Service météorologique du Canada
4552+
href:
4553+
en: https://eccc-msc.github.io/open-data/msc-data/readme_en
4554+
fr: https://eccc-msc.github.io/open-data/msc-data/readme_fr
4555+
hreflang:
4556+
en: en-CA
4557+
fr: fr-CA
4558+
extents:
4559+
spatial:
4560+
bbox: [-145.27, 37.3, -48.11, 87.61]
4561+
crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84
4562+
providers:
4563+
- type: feature
4564+
name: Elasticsearch
4565+
data: ${MSC_PYGEOAPI_ES_URL}/alerts-realtime
4566+
id_field: id
4567+
45334568
raster-drill:
45344569
type: process
45354570
processor:

msc_pygeoapi/loader/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22
#
33
# Author: Tom Kralidis <[email protected]>
44
# Felix Laframboise <[email protected]>
5+
# Louis-Philippe Rousseau-Lambert
6+
57
#
6-
# Copyright (c) 2023 Tom Kralidis
78
# Copyright (c) 2021 Felix Laframboise
9+
# Copyright (c) 2023 Tom Kralidis
10+
# Copyright (c) 2025 Louis-Philippe Rousseau-Lambert
811
#
912
# Permission is hereby granted, free of charge, to any person
1013
# obtaining a copy of this software and associated documentation
@@ -61,6 +64,7 @@ def metadata():
6164
('msc_pygeoapi.loader.forecast_polygons', 'forecast_polygons'),
6265
('msc_pygeoapi.loader.marine_weather_realtime', 'marine_weather'),
6366
('msc_pygeoapi.loader.cap_alerts_realtime', 'cap_alerts'),
67+
('msc_pygeoapi.loader.alerts_realtime', 'alerts_realtime'),
6468
('msc_pygeoapi.loader.swob_realtime', 'swob_realtime'),
6569
('msc_pygeoapi.loader.aqhi_realtime', 'aqhi_realtime'),
6670
('msc_pygeoapi.loader.aqhi_stations', 'aqhi_stations'),
Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
# =================================================================
2+
#
3+
# Author: Louis-Philippe Rousseau-Lambert
4+
5+
#
6+
# Copyright (c) 2025 Louis-Philippe Rousseau-Lambert
7+
#
8+
# Permission is hereby granted, free of charge, to any person
9+
# obtaining a copy of this software and associated documentation
10+
# files (the "Software"), to deal in the Software without
11+
# restriction, including without limitation the rights to use,
12+
# copy, modify, merge, publish, distribute, sublicense, and/or sell
13+
# copies of the Software, and to permit persons to whom the
14+
# Software is furnished to do so, subject to the following
15+
# conditions:
16+
#
17+
# The above copyright notice and this permission notice shall be
18+
# included in all copies or substantial portions of the Software.
19+
#
20+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21+
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
22+
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23+
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24+
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
25+
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
26+
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
27+
# OTHER DEALINGS IN THE SOFTWARE.
28+
#
29+
# =================================================================
30+
31+
from datetime import datetime
32+
import json
33+
import logging
34+
import os
35+
from pathlib import Path
36+
37+
import click
38+
from parse import parse
39+
40+
from msc_pygeoapi import cli_options
41+
from msc_pygeoapi.connector.elasticsearch_ import ElasticsearchConnector
42+
from msc_pygeoapi.loader.base import BaseLoader
43+
from msc_pygeoapi.util import (
44+
configure_es_connection,
45+
check_es_indexes_to_delete,
46+
)
47+
48+
LOGGER = logging.getLogger(__name__)
49+
50+
# cleanup settings
51+
DAYS_TO_KEEP = 7
52+
53+
# index settings
54+
INDEX_BASENAME = 'alerts-realtime.'
55+
56+
ALIAS = 'alerts-realtime'
57+
58+
MAPPINGS = {
59+
'properties': {
60+
'geometry': {'type': 'geo_shape'},
61+
'properties': {
62+
'properties': {
63+
'publication_datetime': {
64+
'type': 'date',
65+
'format': 'strict_date_time'
66+
},
67+
'expiration_datetime': {
68+
'type': 'date',
69+
'format': 'strict_date_time'
70+
},
71+
'validity_datetime': {
72+
'type': 'date',
73+
'format': 'strict_date_time'
74+
},
75+
'event_end_datetime': {
76+
'type': 'date',
77+
'format': 'strict_date_time'
78+
}
79+
}
80+
}
81+
}
82+
}
83+
84+
SETTINGS = {
85+
'order': 0,
86+
'version': 1,
87+
'index_patterns': [f'{INDEX_BASENAME}*'],
88+
'settings': {'number_of_shards': 1, 'number_of_replicas': 0},
89+
'mappings': None
90+
}
91+
92+
93+
class AlertsRealtimeLoader(BaseLoader):
94+
"""Alerts Real-time loader"""
95+
96+
def __init__(self, conn_config={}):
97+
"""initializer"""
98+
99+
BaseLoader.__init__(self)
100+
101+
self.conn = ElasticsearchConnector(conn_config)
102+
self.filepath = None
103+
self.date_ = None
104+
self.index_date = None
105+
self.items = []
106+
107+
def parse_filename(self, filename):
108+
"""
109+
Parses an alerts filename in order to get the date
110+
111+
:return: `bool` of parse status
112+
"""
113+
114+
# parse filepath
115+
# example 20251126T182051.607Z_MSC_Alerts.json:DMS:CMC:ALERTS:JSON:20251126182137 # noqa
116+
pattern = '{date_}_MSC_Alerts.json{_}' # noqa
117+
parsed_filename = parse(pattern, filename)
118+
119+
self.date_ = datetime.strptime(
120+
parsed_filename.named['date_'], '%Y%m%dT%H%M%S.%fZ'
121+
)
122+
self.index_date = datetime.strftime(self.date_, '%Y-%m-%dt%H%M%S.%fz')
123+
124+
return True
125+
126+
def swap_alias(self, index_name):
127+
"""
128+
Swap aliases to point to the new alerts index
129+
130+
:return: `bool` of parse status
131+
"""
132+
133+
self.conn.create_alias(ALIAS, index_name, overwrite=True)
134+
135+
return True
136+
137+
def delete_indices(self, indices):
138+
for idx in indices:
139+
self.conn.delete(idx)
140+
141+
return True
142+
143+
def generate_geojson_features(self, es_index):
144+
"""
145+
Generates and yields a series of umos.
146+
Umos are returned as Elasticsearch bulk API
147+
upsert actions,with documents in GeoJSON to match the Elasticsearch
148+
index mappings.
149+
150+
:returns: Generator of Elasticsearch actions to upsert the alerts
151+
"""
152+
153+
with open(self.filepath.resolve()) as f:
154+
data = json.load(f)
155+
features = data['features']
156+
157+
for feature in features:
158+
prop_id = feature['properties']['id']
159+
feat_id = feature['properties']['feature_id']
160+
feature['id'] = f'{prop_id}_{feat_id}'
161+
feature['properties']['id'] = feature['id']
162+
163+
self.items.append(feature)
164+
165+
action = {
166+
'_id': feature['id'],
167+
'_index': es_index,
168+
'_op_type': 'update',
169+
'doc': feature,
170+
'doc_as_upsert': True
171+
}
172+
173+
yield action
174+
175+
def load_data(self, filepath):
176+
"""
177+
loads data from event to target
178+
:returns: `bool` of status result
179+
"""
180+
181+
self.filepath = Path(filepath)
182+
filename = self.filepath.name
183+
184+
# set class variables from filename
185+
LOGGER.debug(f'Received file {self.filepath}')
186+
self.parse_filename(filename)
187+
188+
# set new index name
189+
es_index = f'{INDEX_BASENAME}{self.index_date}'
190+
LOGGER.debug(f'new index name: {es_index}')
191+
192+
# Check if alias already exists
193+
LOGGER.debug(f'Checking if {self.filepath} is the most recent file')
194+
is_more_recent = False
195+
196+
# using "or []" to avoid having current_indices = None
197+
current_indices = (self.conn.get_alias_indices(ALIAS)) or []
198+
LOGGER.debug(f'Current indices {current_indices}')
199+
200+
is_more_recent = all(
201+
self.date_ > datetime.strptime(".".join(idx.split(".")[1:]),
202+
"%Y-%m-%dt%H%M%S.%fz")
203+
for idx in current_indices
204+
)
205+
LOGGER.debug(f'Is new file more recent --> {is_more_recent}')
206+
207+
if is_more_recent:
208+
LOGGER.debug(f'{self.filepath} is the most recent file')
209+
SETTINGS['index_patterns'] = [f'{INDEX_BASENAME}*']
210+
SETTINGS['mappings'] = MAPPINGS
211+
self.conn.create_template(INDEX_BASENAME, SETTINGS)
212+
213+
# create index
214+
# necessary for empty alerts json
215+
self.conn.create(es_index, {'mappings': MAPPINGS})
216+
217+
# generate geojson features
218+
package = self.generate_geojson_features(es_index)
219+
self.conn.submit_elastic_package(package, request_size=80000)
220+
221+
# Swap alias
222+
LOGGER.debug(f'Swapping alias: {es_index}')
223+
self.swap_alias(es_index)
224+
225+
# Delete old indices
226+
LOGGER.debug(f'Deleting previous indexes: {current_indices}')
227+
self.delete_indices(current_indices)
228+
229+
return True
230+
231+
232+
@click.group()
233+
def alerts_realtime():
234+
"""Manages alerts indexes"""
235+
pass
236+
237+
238+
@click.command()
239+
@click.pass_context
240+
@cli_options.OPTION_FILE()
241+
@cli_options.OPTION_DIRECTORY()
242+
@cli_options.OPTION_ELASTICSEARCH()
243+
@cli_options.OPTION_ES_USERNAME()
244+
@cli_options.OPTION_ES_PASSWORD()
245+
@cli_options.OPTION_ES_IGNORE_CERTS()
246+
def add(ctx, file_, directory, es, username, password, ignore_certs):
247+
"""Add alerts data to Elasticsearch"""
248+
249+
if all([file_ is None, directory is None]):
250+
raise click.ClickException('Missing --file/-f or --dir/-d option')
251+
252+
conn_config = configure_es_connection(es, username, password, ignore_certs)
253+
254+
files_to_process = []
255+
256+
if file_ is not None:
257+
files_to_process = [file_]
258+
elif directory is not None:
259+
for root, dirs, files in os.walk(directory):
260+
for f in [file for file in files if file.endswith('.json')]:
261+
files_to_process.append(os.path.join(root, f))
262+
files_to_process.sort(key=os.path.getmtime)
263+
264+
for file_to_process in files_to_process:
265+
loader = AlertsRealtimeLoader(conn_config)
266+
result = loader.load_data(file_to_process)
267+
if not result:
268+
click.echo('features not generated')
269+
270+
271+
@click.command()
272+
@click.pass_context
273+
@cli_options.OPTION_DAYS(
274+
default=DAYS_TO_KEEP,
275+
help=f'Delete indexes older than n days (default={DAYS_TO_KEEP})',
276+
)
277+
@cli_options.OPTION_ELASTICSEARCH()
278+
@cli_options.OPTION_ES_USERNAME()
279+
@cli_options.OPTION_ES_PASSWORD()
280+
@cli_options.OPTION_ES_IGNORE_CERTS()
281+
@cli_options.OPTION_YES(prompt='Are you sure you want to delete old indexes?')
282+
def clean_indexes(ctx, days, es, username, password, ignore_certs):
283+
"""Delete old alerts realtime indexes older than n days"""
284+
285+
conn_config = configure_es_connection(es, username, password, ignore_certs)
286+
conn = ElasticsearchConnector(conn_config)
287+
288+
indexes_to_fetch = '{INDEX_BASENAME}.*'
289+
290+
indexes = conn.get(indexes_to_fetch)
291+
292+
if indexes:
293+
indexes_to_delete = check_es_indexes_to_delete(indexes, days)
294+
if indexes_to_delete:
295+
click.echo(f'Deleting indexes {indexes_to_delete}')
296+
conn.delete(','.join(indexes_to_delete))
297+
298+
click.echo('Done')
299+
300+
301+
@click.command()
302+
@click.pass_context
303+
@cli_options.OPTION_ELASTICSEARCH()
304+
@cli_options.OPTION_ES_USERNAME()
305+
@cli_options.OPTION_ES_PASSWORD()
306+
@cli_options.OPTION_ES_IGNORE_CERTS()
307+
@cli_options.OPTION_INDEX_TEMPLATE()
308+
def delete_indexes(ctx, es, username, password, ignore_certs,
309+
index_template):
310+
"""Delete all alerts realtime indexes"""
311+
312+
conn_config = configure_es_connection(es, username, password, ignore_certs)
313+
conn = ElasticsearchConnector(conn_config)
314+
315+
indexes = f'{INDEX_BASENAME}*'
316+
317+
click.echo(f'Deleting indexes {indexes}')
318+
conn.delete(indexes)
319+
320+
if index_template:
321+
index_name = indexes
322+
click.echo(f'Deleting index template {index_name}')
323+
conn.delete_template(index_name)
324+
325+
click.echo('Done')
326+
327+
328+
alerts_realtime.add_command(add)
329+
alerts_realtime.add_command(clean_indexes)
330+
alerts_realtime.add_command(delete_indexes)

0 commit comments

Comments
 (0)