diff --git a/components/src/dynamo/vllm/multimodal_handlers/processor_handler.py b/components/src/dynamo/vllm/multimodal_handlers/processor_handler.py index eb84c20190..1ee10d02cd 100644 --- a/components/src/dynamo/vllm/multimodal_handlers/processor_handler.py +++ b/components/src/dynamo/vllm/multimodal_handlers/processor_handler.py @@ -11,7 +11,7 @@ from vllm.engine.arg_utils import AsyncEngineArgs from vllm.entrypoints.openai.protocol import ChatCompletionRequest, CompletionRequest from vllm.outputs import RequestOutput -from vllm.transformers_utils.tokenizer import AnyTokenizer +from vllm.tokenizers import TokenizerLike as AnyTokenizer from dynamo.runtime import Client diff --git a/components/src/dynamo/vllm/multimodal_utils/chat_processor.py b/components/src/dynamo/vllm/multimodal_utils/chat_processor.py index fe8d95dc81..3a693131d9 100644 --- a/components/src/dynamo/vllm/multimodal_utils/chat_processor.py +++ b/components/src/dynamo/vllm/multimodal_utils/chat_processor.py @@ -28,9 +28,22 @@ from vllm.entrypoints.openai.serving_chat import OpenAIServingChat from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion from vllm.entrypoints.openai.serving_engine import RequestPrompt +from vllm.entrypoints.openai.serving_models import BaseModelPath, OpenAIServingModels from vllm.inputs.data import TokensPrompt from vllm.sampling_params import SamplingParams -from vllm.transformers_utils.tokenizer import AnyTokenizer +from vllm.tokenizers import TokenizerLike as AnyTokenizer + + +class StubEngineClient: + """ + Stub EngineClient for preprocessing-only use of OpenAIServingChat/Completion. + Provides the minimal attributes required by OpenAIServingModels. + """ + + def __init__(self, model_config: ModelConfig): + self.model_config = model_config + self.input_processor = None + self.io_processor = None @runtime_checkable @@ -120,12 +133,19 @@ class ChatProcessor: def __init__(self, tokenizer: AnyTokenizer, model_config: ModelConfig): self.tokenizer = tokenizer self.model_config = model_config + # Create stub engine client and models for preprocessing-only usage + stub_engine = StubEngineClient(model_config) + serving_models = OpenAIServingModels( + engine_client=stub_engine, + base_model_paths=[ + BaseModelPath(name=model_config.model, model_path=model_config.model) + ], + ) self.openai_serving = OpenAIServingChat( - engine_client=None, - model_config=model_config, - models=None, - request_logger=None, + engine_client=stub_engine, + models=serving_models, response_role="assistant", + request_logger=None, chat_template=None, chat_template_content_format="auto", ) @@ -186,7 +206,6 @@ async def stream_response( conversation, self.tokenizer, request_metadata, - enable_force_include_usage=False, ): if raw_response.startswith("data: [DONE]"): yield raw_response @@ -220,7 +239,6 @@ async def stream_response( conversation, self.tokenizer, request_metadata, - enable_force_include_usage=False, ): if raw_response.startswith("data: [DONE]"): break @@ -267,10 +285,17 @@ class CompletionsProcessor: def __init__(self, tokenizer: AnyTokenizer, model_config: ModelConfig): self.tokenizer = tokenizer self.model_config = model_config + # Create stub engine client and models for preprocessing-only usage + stub_engine = StubEngineClient(model_config) + serving_models = OpenAIServingModels( + engine_client=stub_engine, + base_model_paths=[ + BaseModelPath(name=model_config.model, model_path=model_config.model) + ], + ) self.openai_serving = OpenAIServingCompletion( - engine_client=None, - model_config=model_config, - models=None, + engine_client=stub_engine, + models=serving_models, request_logger=None, ) diff --git a/components/src/dynamo/vllm/multimodal_utils/protocol.py b/components/src/dynamo/vllm/multimodal_utils/protocol.py index ef8d2bea91..c05f6cdeeb 100644 --- a/components/src/dynamo/vllm/multimodal_utils/protocol.py +++ b/components/src/dynamo/vllm/multimodal_utils/protocol.py @@ -26,7 +26,7 @@ from vllm.multimodal.inputs import MultiModalUUIDDict # noqa: F401 from vllm.outputs import CompletionOutput from vllm.sampling_params import SamplingParams -from vllm.sequence import RequestMetrics +from vllm.v1.metrics.stats import RequestStateStats import dynamo.nixl_connect as connect @@ -156,7 +156,7 @@ class MyRequestOutput(BaseModel): https://github.com/vllm-project/vllm/blob/a4c402a756fa3213caf9d2cde0e4ceb2d57727f2/vllm/outputs.py#L85 This class is used to serialize the RequestOutput and any recursively defined types - We can do this because PromptLogprobs, RequestMetrics, and CompletionOutput are all serializable dataclasses + We can do this because PromptLogprobs, RequestStateStats, and CompletionOutput are all serializable dataclasses """ model_config = ConfigDict(arbitrary_types_allowed=True) @@ -167,7 +167,7 @@ class MyRequestOutput(BaseModel): prompt_logprobs: Optional[PromptLogprobs] = None outputs: List[CompletionOutput] finished: bool - metrics: Optional[RequestMetrics] = None + metrics: Optional[RequestStateStats] = None kv_transfer_params: Optional[dict[str, Any]] = None # lora_request: Optional[LoRARequest] = None # encoder_prompt: Optional[str] = None diff --git a/examples/backends/vllm/launch/agg_multimodal_epd.sh b/examples/backends/vllm/launch/agg_multimodal_epd.sh index a94ab3c1f4..e905409325 100755 --- a/examples/backends/vllm/launch/agg_multimodal_epd.sh +++ b/examples/backends/vllm/launch/agg_multimodal_epd.sh @@ -80,7 +80,7 @@ python -m dynamo.vllm --multimodal-processor --enable-multimodal --model $MODEL_ # run E/P/D workers CUDA_VISIBLE_DEVICES=0 python -m dynamo.vllm --multimodal-encode-worker --enable-multimodal --model $MODEL_NAME & -CUDA_VISIBLE_DEVICES=1 python -m dynamo.vllm --multimodal-worker --enable-multimodal --model $MODEL_NAME $EXTRA_ARGS & +CUDA_VISIBLE_DEVICES=0 python -m dynamo.vllm --multimodal-worker --enable-multimodal --enable-mm-embeds --model $MODEL_NAME $EXTRA_ARGS & # Wait for all background processes to complete wait diff --git a/examples/backends/vllm/launch/disagg_multimodal_epd.sh b/examples/backends/vllm/launch/disagg_multimodal_epd.sh index 75b30abb8e..0e253c12be 100755 --- a/examples/backends/vllm/launch/disagg_multimodal_epd.sh +++ b/examples/backends/vllm/launch/disagg_multimodal_epd.sh @@ -81,23 +81,20 @@ python -m dynamo.vllm --multimodal-processor --enable-multimodal --model $MODEL_ # Configure GPU memory optimization for specific models EXTRA_ARGS="" -if [[ "$MODEL_NAME" == "Qwen/Qwen2.5-VL-7B-Instruct" ]]; then - EXTRA_ARGS="--gpu-memory-utilization 0.85 --max-model-len 2048" -fi # Start encode worker -echo "Starting encode worker on GPU 1..." -VLLM_NIXL_SIDE_CHANNEL_PORT=20097 CUDA_VISIBLE_DEVICES=1 python -m dynamo.vllm --multimodal-encode-worker --enable-multimodal --model $MODEL_NAME $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20080"}' & +echo "Starting encode worker on GPU 0..." +VLLM_NIXL_SIDE_CHANNEL_PORT=20097 CUDA_VISIBLE_DEVICES=0 python -m dynamo.vllm --multimodal-encode-worker --enable-multimodal --model $MODEL_NAME $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20080"}' & # Start prefill worker -echo "Starting prefill worker on GPU 2..." +echo "Starting prefill worker on GPU 1..." VLLM_NIXL_SIDE_CHANNEL_PORT=20098 \ -CUDA_VISIBLE_DEVICES=2 python -m dynamo.vllm --multimodal-worker --is-prefill-worker --enable-multimodal --model $MODEL_NAME $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20081"}' & +CUDA_VISIBLE_DEVICES=1 python -m dynamo.vllm --multimodal-worker --is-prefill-worker --enable-multimodal --enable-mm-embeds --model $MODEL_NAME $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20081"}' & # Start decode worker -echo "Starting decode worker on GPU 3..." +echo "Starting decode worker on GPU 2..." VLLM_NIXL_SIDE_CHANNEL_PORT=20099 \ -CUDA_VISIBLE_DEVICES=3 python -m dynamo.vllm --multimodal-decode-worker --enable-multimodal --model $MODEL_NAME $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20082"}' & +CUDA_VISIBLE_DEVICES=2 python -m dynamo.vllm --multimodal-decode-worker --enable-multimodal --model $MODEL_NAME $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20082"}' & echo "==================================================" echo "All components started. Waiting for initialization..." diff --git a/examples/multimodal/components/audio_encode_worker.py b/examples/multimodal/components/audio_encode_worker.py index 8917543ef8..4384ec2e9c 100644 --- a/examples/multimodal/components/audio_encode_worker.py +++ b/examples/multimodal/components/audio_encode_worker.py @@ -25,7 +25,7 @@ import uvloop from transformers import AutoProcessor, Qwen2AudioForConditionalGeneration from vllm.engine.arg_utils import AsyncEngineArgs -from vllm.utils import FlexibleArgumentParser +from vllm.utils.argparse_utils import FlexibleArgumentParser import dynamo.nixl_connect as connect from dynamo.runtime import Client, DistributedRuntime, dynamo_worker @@ -201,7 +201,6 @@ async def async_init(self, runtime: DistributedRuntime): # Create and initialize a dynamo connector for this worker. # We'll needs this to move data between this worker and remote workers efficiently. self._connector = connect.Connector() - await self._connector.initialize() logger.info("Startup completed.") diff --git a/examples/multimodal/components/encode_worker.py b/examples/multimodal/components/encode_worker.py index 05af900e98..282e785037 100644 --- a/examples/multimodal/components/encode_worker.py +++ b/examples/multimodal/components/encode_worker.py @@ -12,7 +12,7 @@ import uvloop from transformers import AutoImageProcessor from vllm.engine.arg_utils import AsyncEngineArgs -from vllm.utils import FlexibleArgumentParser +from vllm.utils.argparse_utils import FlexibleArgumentParser import dynamo.nixl_connect as connect from dynamo.runtime import Client, DistributedRuntime, dynamo_worker diff --git a/examples/multimodal/components/processor.py b/examples/multimodal/components/processor.py index 7bc1be7b25..ede65cc975 100644 --- a/examples/multimodal/components/processor.py +++ b/examples/multimodal/components/processor.py @@ -17,8 +17,8 @@ from vllm.engine.arg_utils import AsyncEngineArgs from vllm.entrypoints.openai.protocol import ChatCompletionRequest, CompletionRequest from vllm.outputs import RequestOutput -from vllm.transformers_utils.tokenizer import AnyTokenizer -from vllm.utils import FlexibleArgumentParser +from vllm.tokenizers import TokenizerLike as AnyTokenizer +from vllm.utils.argparse_utils import FlexibleArgumentParser from dynamo.llm import ModelInput, ModelType, register_llm from dynamo.runtime import Client, DistributedRuntime, dynamo_worker diff --git a/examples/multimodal/components/publisher.py b/examples/multimodal/components/publisher.py index c1937fd6c6..19fe18ccff 100644 --- a/examples/multimodal/components/publisher.py +++ b/examples/multimodal/components/publisher.py @@ -38,6 +38,8 @@ def record( scheduler_stats: Optional[SchedulerStats], iteration_stats: Optional[IterationStats], engine_idx: int = 0, + *args, + **kwargs, ): pass @@ -74,6 +76,8 @@ def record( scheduler_stats: SchedulerStats, iteration_stats: Optional[IterationStats], engine_idx: int = 0, + *args, + **kwargs, ): # request_total_slots and kv_total_blocks are properties of model + gpu # we should only publish them once, not every metric update diff --git a/examples/multimodal/components/video_encode_worker.py b/examples/multimodal/components/video_encode_worker.py index 5453c9ab16..9602c6ed39 100644 --- a/examples/multimodal/components/video_encode_worker.py +++ b/examples/multimodal/components/video_encode_worker.py @@ -16,7 +16,7 @@ import torch import uvloop from vllm.engine.arg_utils import AsyncEngineArgs -from vllm.utils import FlexibleArgumentParser +from vllm.utils.argparse_utils import FlexibleArgumentParser import dynamo.nixl_connect as connect from dynamo.runtime import Client, DistributedRuntime, dynamo_worker diff --git a/examples/multimodal/components/worker.py b/examples/multimodal/components/worker.py index 4e3b7ba43e..d5efa22a85 100644 --- a/examples/multimodal/components/worker.py +++ b/examples/multimodal/components/worker.py @@ -15,7 +15,7 @@ from vllm.distributed.kv_events import ZmqEventPublisher from vllm.inputs.data import TokensPrompt from vllm.usage.usage_lib import UsageContext -from vllm.utils import FlexibleArgumentParser +from vllm.utils.argparse_utils import FlexibleArgumentParser from vllm.v1.engine.async_llm import AsyncLLM import dynamo.nixl_connect as connect @@ -251,7 +251,6 @@ async def async_init(self, runtime: DistributedRuntime): # We'll needs this to move data between this worker and remote workers efficiently. parsed_namespace, _, _ = parse_endpoint(self.endpoint) self._connector = connect.Connector() - await self._connector.initialize() self.image_loader = ImageLoader() diff --git a/examples/multimodal/launch/audio_agg.sh b/examples/multimodal/launch/audio_agg.sh index 3f1af408b1..0ea01066f0 100755 --- a/examples/multimodal/launch/audio_agg.sh +++ b/examples/multimodal/launch/audio_agg.sh @@ -91,7 +91,7 @@ python3 components/processor.py --model $MODEL_NAME --prompt-template "$PROMPT_T # run E/P/D workers CUDA_VISIBLE_DEVICES=0 python3 components/audio_encode_worker.py --model $MODEL_NAME & -VLLM_NIXL_SIDE_CHANNEL_PORT=20097 CUDA_VISIBLE_DEVICES=1 python3 components/worker.py --model $MODEL_NAME --worker-type prefill & +VLLM_NIXL_SIDE_CHANNEL_PORT=20097 CUDA_VISIBLE_DEVICES=0 python3 components/worker.py --model $MODEL_NAME --worker-type prefill & # Wait for all background processes to complete wait diff --git a/examples/multimodal/utils/args.py b/examples/multimodal/utils/args.py index 3fe10ee0b1..df6ce698da 100644 --- a/examples/multimodal/utils/args.py +++ b/examples/multimodal/utils/args.py @@ -159,6 +159,8 @@ def overwrite_args(config): "enable_prefix_caching": True, # KV routing relies on logging KV metrics "disable_log_stats": False, + # Enable multimodal embeddings input + "enable_mm_embeds": True, # Always setting up kv transfer for disagg "kv_transfer_config": KVTransferConfig( kv_connector="NixlConnector", kv_role="kv_both" diff --git a/examples/multimodal/utils/chat_processor.py b/examples/multimodal/utils/chat_processor.py index fe8d95dc81..3a693131d9 100644 --- a/examples/multimodal/utils/chat_processor.py +++ b/examples/multimodal/utils/chat_processor.py @@ -28,9 +28,22 @@ from vllm.entrypoints.openai.serving_chat import OpenAIServingChat from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion from vllm.entrypoints.openai.serving_engine import RequestPrompt +from vllm.entrypoints.openai.serving_models import BaseModelPath, OpenAIServingModels from vllm.inputs.data import TokensPrompt from vllm.sampling_params import SamplingParams -from vllm.transformers_utils.tokenizer import AnyTokenizer +from vllm.tokenizers import TokenizerLike as AnyTokenizer + + +class StubEngineClient: + """ + Stub EngineClient for preprocessing-only use of OpenAIServingChat/Completion. + Provides the minimal attributes required by OpenAIServingModels. + """ + + def __init__(self, model_config: ModelConfig): + self.model_config = model_config + self.input_processor = None + self.io_processor = None @runtime_checkable @@ -120,12 +133,19 @@ class ChatProcessor: def __init__(self, tokenizer: AnyTokenizer, model_config: ModelConfig): self.tokenizer = tokenizer self.model_config = model_config + # Create stub engine client and models for preprocessing-only usage + stub_engine = StubEngineClient(model_config) + serving_models = OpenAIServingModels( + engine_client=stub_engine, + base_model_paths=[ + BaseModelPath(name=model_config.model, model_path=model_config.model) + ], + ) self.openai_serving = OpenAIServingChat( - engine_client=None, - model_config=model_config, - models=None, - request_logger=None, + engine_client=stub_engine, + models=serving_models, response_role="assistant", + request_logger=None, chat_template=None, chat_template_content_format="auto", ) @@ -186,7 +206,6 @@ async def stream_response( conversation, self.tokenizer, request_metadata, - enable_force_include_usage=False, ): if raw_response.startswith("data: [DONE]"): yield raw_response @@ -220,7 +239,6 @@ async def stream_response( conversation, self.tokenizer, request_metadata, - enable_force_include_usage=False, ): if raw_response.startswith("data: [DONE]"): break @@ -267,10 +285,17 @@ class CompletionsProcessor: def __init__(self, tokenizer: AnyTokenizer, model_config: ModelConfig): self.tokenizer = tokenizer self.model_config = model_config + # Create stub engine client and models for preprocessing-only usage + stub_engine = StubEngineClient(model_config) + serving_models = OpenAIServingModels( + engine_client=stub_engine, + base_model_paths=[ + BaseModelPath(name=model_config.model, model_path=model_config.model) + ], + ) self.openai_serving = OpenAIServingCompletion( - engine_client=None, - model_config=model_config, - models=None, + engine_client=stub_engine, + models=serving_models, request_logger=None, ) diff --git a/examples/multimodal/utils/protocol.py b/examples/multimodal/utils/protocol.py index c31dd82799..a724b8720d 100644 --- a/examples/multimodal/utils/protocol.py +++ b/examples/multimodal/utils/protocol.py @@ -26,7 +26,7 @@ from vllm.multimodal.inputs import MultiModalUUIDDict # noqa: F401 from vllm.outputs import CompletionOutput from vllm.sampling_params import SamplingParams -from vllm.sequence import RequestMetrics +from vllm.v1.metrics.stats import RequestStateStats import dynamo.nixl_connect as connect @@ -166,7 +166,7 @@ class MyRequestOutput(BaseModel): https://github.com/vllm-project/vllm/blob/a4c402a756fa3213caf9d2cde0e4ceb2d57727f2/vllm/outputs.py#L85 This class is used to serialize the RequestOutput and any recursively defined types - We can do this because PromptLogprobs, RequestMetrics, and CompletionOutput are all serializable dataclasses + We can do this because PromptLogprobs, RequestStateStats, and CompletionOutput are all serializable dataclasses """ model_config = ConfigDict(arbitrary_types_allowed=True) @@ -177,7 +177,7 @@ class MyRequestOutput(BaseModel): prompt_logprobs: Optional[PromptLogprobs] = None outputs: List[CompletionOutput] finished: bool - metrics: Optional[RequestMetrics] = None + metrics: Optional[RequestStateStats] = None kv_transfer_params: Optional[dict[str, Any]] = None # lora_request: Optional[LoRARequest] = None # encoder_prompt: Optional[str] = None