diff --git a/msc_wis2node/publisher.py b/msc_wis2node/publisher.py index 1f6c0e5..df9a5b4 100644 --- a/msc_wis2node/publisher.py +++ b/msc_wis2node/publisher.py @@ -41,6 +41,11 @@ class WIS2FlowCB(FlowCB): + def __init__(self, options): + super().__init__(options,LOGGER) + self.o.add_option('selfPublish', 'flag', True) + self.wis2_nonPublisher = WIS2Publisher() + def after_accept(self, worklist) -> None: """ sarracenia dispatcher @@ -56,16 +61,32 @@ def after_accept(self, worklist) -> None: try: LOGGER.debug('Processing message') - wis2_publisher = WIS2Publisher() - - if wis2_publisher.publish(msg['baseUrl'], msg['relPath']): - new_incoming.append(msg) + if self.o.selfPublish: + wis2_publisher = WIS2Publisher() + if wis2_publisher.publish(msg['baseUrl'], msg['relPath']): + new_incoming.append(msg) + else: + worklist.rejected.append(msg) else: - return + dataset = self.wis2_nonPublisher.identify(msg['relPath']) + if dataset: + + + # 2024-03-15 19:40:44,350 [CRITICAL] 2782038 publisher publish Dataset: {'metadata-id': 'c944aca6-0d59-418c-9d91-23247c8ada17', 'regexes': ['.*ISA[A|B]0[1-6].*'], 'title': 'Hourly surface based observations', 'subtopic': 'bulletins.alphanumeric.*.IS.CWAO.#', 'wis2-topic': 'data/core/weather/surface-based-observations/synop', 'media-type': 'application/x-bufr', 'cache': True} + msg["topic"] = self.o.post_exchange[0] + "/" + "/".join(self.o.post_topicPrefix) + "/" + dataset["wis2-topic"] + + msg["data_id"] = dataset["wis2-topic"] + "/" + msg["relPath"].split("/")[-1] + # wis2/ca-eccc-msc - missing from start + msg["contentType"] = dataset["media-type"] + + new_incoming.append(msg) + + else: + worklist.rejected.append(msg) + except Exception as err: LOGGER.error(f'Error publishing message: {err}', exc_info=True) worklist.failed.append(msg) - continue worklist.incoming = new_incoming @@ -75,7 +96,7 @@ class WIS2Publisher: def __init__(self): """initialize""" - + self.datasets = [] self.tls = None @@ -108,7 +129,7 @@ def publish(self, base_url: str, relative_path: str) -> bool: relative_path2 = '/' + relative_path.lstrip('/') url = f'{base_url}{relative_path2}' - + LOGGER.debug(f'Publishing dataset notification: {url}') self.publish_to_wis2(dataset, url) @@ -125,7 +146,7 @@ def identify(self, path: str) -> Union[dict, None]: """ for dataset in self.datasets: - LOGGER.debug(f'Dataset: {dataset}') + LOGGER.debug(f'{dataset = }') match = False subtopic_dirpath = self._subtopic2dirpath(dataset['subtopic'])