Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions msc_wis2node/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@


class WIS2FlowCB(FlowCB):
def __init__(self, options):
super().__init__(options,LOGGER)
self.o.add_option('selfPublish', 'flag', True)
self.wis2_nonPublisher = WIS2Publisher()

def after_accept(self, worklist) -> None:
"""
sarracenia dispatcher
Expand All @@ -56,16 +61,32 @@ def after_accept(self, worklist) -> None:
try:
LOGGER.debug('Processing message')

wis2_publisher = WIS2Publisher()

if wis2_publisher.publish(msg['baseUrl'], msg['relPath']):
new_incoming.append(msg)
if self.o.selfPublish:
wis2_publisher = WIS2Publisher()
if wis2_publisher.publish(msg['baseUrl'], msg['relPath']):
new_incoming.append(msg)
else:
worklist.rejected.append(msg)
else:
return
dataset = self.wis2_nonPublisher.identify(msg['relPath'])
if dataset:


# 2024-03-15 19:40:44,350 [CRITICAL] 2782038 publisher publish Dataset: {'metadata-id': 'c944aca6-0d59-418c-9d91-23247c8ada17', 'regexes': ['.*ISA[A|B]0[1-6].*'], 'title': 'Hourly surface based observations', 'subtopic': 'bulletins.alphanumeric.*.IS.CWAO.#', 'wis2-topic': 'data/core/weather/surface-based-observations/synop', 'media-type': 'application/x-bufr', 'cache': True}
msg["topic"] = self.o.post_exchange[0] + "/" + "/".join(self.o.post_topicPrefix) + "/" + dataset["wis2-topic"]

msg["data_id"] = dataset["wis2-topic"] + "/" + msg["relPath"].split("/")[-1]
# wis2/ca-eccc-msc - missing from start
msg["contentType"] = dataset["media-type"]

new_incoming.append(msg)

else:
worklist.rejected.append(msg)

except Exception as err:
LOGGER.error(f'Error publishing message: {err}', exc_info=True)
worklist.failed.append(msg)
continue

worklist.incoming = new_incoming

Expand All @@ -75,7 +96,7 @@ class WIS2Publisher:

def __init__(self):
"""initialize"""

self.datasets = []
self.tls = None

Expand Down Expand Up @@ -108,7 +129,7 @@ def publish(self, base_url: str, relative_path: str) -> bool:

relative_path2 = '/' + relative_path.lstrip('/')
url = f'{base_url}{relative_path2}'

LOGGER.debug(f'Publishing dataset notification: {url}')
self.publish_to_wis2(dataset, url)

Expand All @@ -125,7 +146,7 @@ def identify(self, path: str) -> Union[dict, None]:
"""

for dataset in self.datasets:
LOGGER.debug(f'Dataset: {dataset}')
LOGGER.debug(f'{dataset = }')
match = False
subtopic_dirpath = self._subtopic2dirpath(dataset['subtopic'])

Expand Down