11import functools
22import importlib
33
4+ from sophys .cli .core import ENVVARS
45from sophys .cli .core .magics import NamespaceKeys , add_to_namespace , get_from_namespace
56
67
@@ -65,16 +66,15 @@ def resume(self, *args, **kwargs):
6566 return RE
6667
6768
68- def create_kafka_parameters (default_topic_names , default_bootstrap_servers , extension_name , _globals ):
69- kafka_topic = default_topic_names ()[0 ]
69+ def create_kafka_parameters (extension_name , _globals ):
70+ kafka_topic = ENVVARS .KAFKA_TOPIC
71+ bootstrap_servers = [f"{ ENVVARS .KAFKA_HOST } :{ ENVVARS .KAFKA_PORT } " ]
7072
7173 in_test_mode = get_from_namespace (NamespaceKeys .TEST_MODE , False , ns = _globals )
7274 in_local_mode = get_from_namespace (NamespaceKeys .LOCAL_MODE , False , ns = _globals )
7375 if in_test_mode and in_local_mode :
7476 kafka_topic = kafka_topic .replace (extension_name , "test" )
7577
76- bootstrap_servers = default_bootstrap_servers ()
77-
7878 add_to_namespace (NamespaceKeys .KAFKA_BOOTSTRAP , bootstrap_servers , _globals = _globals )
7979 add_to_namespace (NamespaceKeys .KAFKA_TOPIC , kafka_topic , _globals = _globals )
8080
@@ -96,14 +96,15 @@ def __create_kafka_monitor(topic_name: str, bootstrap_servers: list[str], subscr
9696 add_to_namespace (NamespaceKeys .KAFKA_MONITOR , monitor , _globals = _globals )
9797
9898
99- def create_kafka_callback (RE , kafka_callback_factory , logger , kafka_topic , bootstrap_servers , callbacks , _globals ):
99+ def create_kafka_callback (RE , logger , kafka_topic , bootstrap_servers , callbacks , _globals ):
100100 from kafka .errors import NoBrokersAvailable
101+ from sophys .common .utils .kafka import make_kafka_callback
101102
102103 logger .info (f"Connecting to kafka... (IPs: { bootstrap_servers } | Topic: { kafka_topic } )" )
103104
104105 try :
105106 # RE -> Kafka
106- RE .subscribe (kafka_callback_factory (topic_names = [kafka_topic ], bootstrap_servers = bootstrap_servers , backoff_times = [0.1 , 1.0 ]))
107+ RE .subscribe (make_kafka_callback (topic_names = [kafka_topic ], bootstrap_servers = bootstrap_servers , backoff_times = [0.1 , 1.0 ]))
107108
108109 logger .info ("Connected to the kafka broker successfully!" )
109110 except (TypeError , NoBrokersAvailable ):
@@ -160,11 +161,7 @@ def execute_at_start(extension_name, _globals):
160161
161162 callbacks = create_callbacks (_globals )
162163
163- sophys_utils = importlib .import_module (f"sophys.{ extension_name } .utils" )
164- default_topic_names = sophys_utils .default_topic_names
165- default_bootstrap_servers = sophys_utils .default_bootstrap_servers
166-
167- kafka_topic , bootstrap_servers = create_kafka_parameters (default_topic_names , default_bootstrap_servers , extension_name , _globals )
164+ kafka_topic , bootstrap_servers = create_kafka_parameters (extension_name , _globals )
168165
169166 if not get_from_namespace (NamespaceKeys .LOCAL_MODE , False , ns = _globals ):
170167 # Remote mode, only setup the monitor
@@ -176,6 +173,6 @@ def execute_at_start(extension_name, _globals):
176173
177174 # Kafka callback
178175 # NOTE: This is needed even in the local setting so that `kbl` works even in this case.
179- create_kafka_callback (RE , sophys_utils , kafka_logger , kafka_topic , bootstrap_servers , callbacks , _globals )
176+ create_kafka_callback (RE , kafka_logger , kafka_topic , bootstrap_servers , callbacks , _globals )
180177
181178 instantiate_devices (sophys_logger , extension_name , _globals )
0 commit comments