@@ -430,16 +430,24 @@ async def init_multimodal_encode_worker(runtime: DistributedRuntime, config: Con
430430
431431 await pd_worker_client .wait_for_instances ()
432432
433- tasks = [
434- generate_endpoint .serve_endpoint (
435- handler .generate ,
436- graceful_shutdown = True ,
437- metrics_labels = [("model" , server_args .served_model_name )],
438- )
439- ]
433+ ready_event = asyncio .Event ()
440434
441435 try :
442- await asyncio .gather (* tasks )
436+ await asyncio .gather (
437+ generate_endpoint .serve_endpoint (
438+ handler .generate ,
439+ graceful_shutdown = True ,
440+ metrics_labels = [("model" , server_args .served_model_name )],
441+ ),
442+ register_llm_with_readiness_gate (
443+ None , # encode worker doesn't have engine
444+ generate_endpoint ,
445+ server_args ,
446+ dynamo_args ,
447+ input_type = ModelInput .Text ,
448+ readiness_gate = ready_event ,
449+ ),
450+ )
443451 except Exception as e :
444452 logging .error (f"Failed to serve endpoints: { e } " )
445453 raise
@@ -473,11 +481,24 @@ async def init_multimodal_worker(runtime: DistributedRuntime, config: Config):
473481
474482 await handler .async_init ()
475483
484+ health_check_payload = SglangHealthCheckPayload (engine ).to_dict ()
485+ ready_event = asyncio .Event ()
486+
476487 try :
477- await generate_endpoint .serve_endpoint (
478- handler .generate ,
479- metrics_labels = [("model" , server_args .served_model_name )],
480- graceful_shutdown = True ,
488+ await asyncio .gather (
489+ generate_endpoint .serve_endpoint (
490+ handler .generate ,
491+ metrics_labels = [("model" , server_args .served_model_name )],
492+ graceful_shutdown = True ,
493+ health_check_payload = health_check_payload ,
494+ ),
495+ register_llm_with_readiness_gate (
496+ engine ,
497+ generate_endpoint ,
498+ server_args ,
499+ dynamo_args ,
500+ readiness_gate = ready_event ,
501+ ),
481502 )
482503 except Exception as e :
483504 logging .error (f"Failed to serve endpoints: { e } " )
@@ -502,6 +523,7 @@ async def init_multimodal_prefill_worker(runtime: DistributedRuntime, config: Co
502523 await handler .async_init ()
503524
504525 health_check_payload = SglangPrefillHealthCheckPayload (engine ).to_dict ()
526+ ready_event = asyncio .Event ()
505527
506528 try :
507529 await asyncio .gather (
@@ -510,7 +532,14 @@ async def init_multimodal_prefill_worker(runtime: DistributedRuntime, config: Co
510532 graceful_shutdown = True ,
511533 metrics_labels = [("model" , server_args .served_model_name )],
512534 health_check_payload = health_check_payload ,
513- )
535+ ),
536+ register_llm_with_readiness_gate (
537+ engine ,
538+ generate_endpoint ,
539+ server_args ,
540+ dynamo_args ,
541+ readiness_gate = ready_event ,
542+ ),
514543 )
515544 except Exception as e :
516545 logging .error (f"Failed to serve endpoints: { e } " )
0 commit comments