|
10 | 10 | import zmq |
11 | 11 | import zmq.asyncio |
12 | 12 | from prometheus_client import CollectorRegistry, multiprocess |
13 | | -from sglang.srt.utils import get_local_ip_auto, get_zmq_socket |
| 13 | +from sglang.srt.utils import get_local_ip_auto, get_zmq_socket, maybe_wrap_ipv6_address |
14 | 14 |
|
15 | 15 | from dynamo.common.utils.prometheus import register_engine_metrics_callback |
16 | 16 | from dynamo.llm import ( |
|
26 | 26 | from dynamo.sglang.args import Config |
27 | 27 |
|
28 | 28 |
|
| 29 | +def format_zmq_endpoint(endpoint_template: str, ip_address: str) -> str: |
| 30 | + """Format ZMQ endpoint by replacing wildcard with IP address. |
| 31 | +
|
| 32 | + Properly handles IPv6 addresses by wrapping them in square brackets. |
| 33 | + Uses SGLang's maybe_wrap_ipv6_address for consistent formatting. |
| 34 | +
|
| 35 | + Args: |
| 36 | + endpoint_template: ZMQ endpoint template with wildcard (e.g., "tcp://*:5557") |
| 37 | + ip_address: IP address to use (can be IPv4 or IPv6) |
| 38 | +
|
| 39 | + Returns: |
| 40 | + Formatted ZMQ endpoint string |
| 41 | +
|
| 42 | + Example: |
| 43 | + >>> format_zmq_endpoint("tcp://*:5557", "192.168.1.1") |
| 44 | + 'tcp://192.168.1.1:5557' |
| 45 | + >>> format_zmq_endpoint("tcp://*:5557", "2a02:6b8:c46:2b4:0:74c1:75b0:0") |
| 46 | + 'tcp://[2a02:6b8:c46:2b4:0:74c1:75b0:0]:5557' |
| 47 | + """ |
| 48 | + # Use SGLang's utility to wrap IPv6 addresses in brackets |
| 49 | + formatted_ip = maybe_wrap_ipv6_address(ip_address) |
| 50 | + return endpoint_template.replace("*", formatted_ip) |
| 51 | + |
| 52 | + |
29 | 53 | class DynamoSglangPublisher: |
30 | 54 | """ |
31 | 55 | Handles SGLang kv events and metrics reception and publishing. |
@@ -121,7 +145,7 @@ def init_kv_event_publish(self) -> Optional[ZmqKvEventPublisher]: |
121 | 145 | if self.server_args.kv_events_config: |
122 | 146 | kv_events = json.loads(self.server_args.kv_events_config) |
123 | 147 | ep = kv_events.get("endpoint") |
124 | | - zmq_ep = ep.replace("*", get_local_ip_auto()) if ep else None |
| 148 | + zmq_ep = format_zmq_endpoint(ep, get_local_ip_auto()) if ep else None |
125 | 149 |
|
126 | 150 | zmq_config = ZmqKvEventPublisherConfig( |
127 | 151 | worker_id=self.generate_endpoint.connection_id(), |
|
0 commit comments