Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.entrypoints.openai.protocol import ChatCompletionRequest, CompletionRequest
from vllm.outputs import RequestOutput
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.tokenizers import TokenizerLike as AnyTokenizer

from dynamo.runtime import Client

Expand Down
45 changes: 35 additions & 10 deletions components/src/dynamo/vllm/multimodal_utils/chat_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,22 @@
from vllm.entrypoints.openai.serving_chat import OpenAIServingChat
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
from vllm.entrypoints.openai.serving_engine import RequestPrompt
from vllm.entrypoints.openai.serving_models import BaseModelPath, OpenAIServingModels
from vllm.inputs.data import TokensPrompt
from vllm.sampling_params import SamplingParams
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.tokenizers import TokenizerLike as AnyTokenizer


class StubEngineClient:
"""
Stub EngineClient for preprocessing-only use of OpenAIServingChat/Completion.
Provides the minimal attributes required by OpenAIServingModels.
"""

def __init__(self, model_config: ModelConfig):
self.model_config = model_config
self.input_processor = None
self.io_processor = None


@runtime_checkable
Expand Down Expand Up @@ -120,12 +133,19 @@ class ChatProcessor:
def __init__(self, tokenizer: AnyTokenizer, model_config: ModelConfig):
self.tokenizer = tokenizer
self.model_config = model_config
# Create stub engine client and models for preprocessing-only usage
stub_engine = StubEngineClient(model_config)
serving_models = OpenAIServingModels(
engine_client=stub_engine,
base_model_paths=[
BaseModelPath(name=model_config.model, model_path=model_config.model)
],
)
self.openai_serving = OpenAIServingChat(
engine_client=None,
model_config=model_config,
models=None,
request_logger=None,
engine_client=stub_engine,
models=serving_models,
response_role="assistant",
request_logger=None,
chat_template=None,
chat_template_content_format="auto",
)
Expand Down Expand Up @@ -186,7 +206,6 @@ async def stream_response(
conversation,
self.tokenizer,
request_metadata,
enable_force_include_usage=False,
):
if raw_response.startswith("data: [DONE]"):
yield raw_response
Expand Down Expand Up @@ -220,7 +239,6 @@ async def stream_response(
conversation,
self.tokenizer,
request_metadata,
enable_force_include_usage=False,
):
if raw_response.startswith("data: [DONE]"):
break
Expand Down Expand Up @@ -267,10 +285,17 @@ class CompletionsProcessor:
def __init__(self, tokenizer: AnyTokenizer, model_config: ModelConfig):
self.tokenizer = tokenizer
self.model_config = model_config
# Create stub engine client and models for preprocessing-only usage
stub_engine = StubEngineClient(model_config)
serving_models = OpenAIServingModels(
engine_client=stub_engine,
base_model_paths=[
BaseModelPath(name=model_config.model, model_path=model_config.model)
],
)
self.openai_serving = OpenAIServingCompletion(
engine_client=None,
model_config=model_config,
models=None,
engine_client=stub_engine,
models=serving_models,
request_logger=None,
)

Expand Down
6 changes: 3 additions & 3 deletions components/src/dynamo/vllm/multimodal_utils/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from vllm.multimodal.inputs import MultiModalUUIDDict # noqa: F401
from vllm.outputs import CompletionOutput
from vllm.sampling_params import SamplingParams
from vllm.sequence import RequestMetrics
from vllm.v1.metrics.stats import RequestStateStats

import dynamo.nixl_connect as connect

Expand Down Expand Up @@ -156,7 +156,7 @@ class MyRequestOutput(BaseModel):
https://github.com/vllm-project/vllm/blob/a4c402a756fa3213caf9d2cde0e4ceb2d57727f2/vllm/outputs.py#L85

This class is used to serialize the RequestOutput and any recursively defined types
We can do this because PromptLogprobs, RequestMetrics, and CompletionOutput are all serializable dataclasses
We can do this because PromptLogprobs, RequestStateStats, and CompletionOutput are all serializable dataclasses
"""

model_config = ConfigDict(arbitrary_types_allowed=True)
Expand All @@ -167,7 +167,7 @@ class MyRequestOutput(BaseModel):
prompt_logprobs: Optional[PromptLogprobs] = None
outputs: List[CompletionOutput]
finished: bool
metrics: Optional[RequestMetrics] = None
metrics: Optional[RequestStateStats] = None
kv_transfer_params: Optional[dict[str, Any]] = None
# lora_request: Optional[LoRARequest] = None
# encoder_prompt: Optional[str] = None
Expand Down
2 changes: 1 addition & 1 deletion examples/backends/vllm/launch/agg_multimodal_epd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ python -m dynamo.vllm --multimodal-processor --enable-multimodal --model $MODEL_

# run E/P/D workers
CUDA_VISIBLE_DEVICES=0 python -m dynamo.vllm --multimodal-encode-worker --enable-multimodal --model $MODEL_NAME &
CUDA_VISIBLE_DEVICES=1 python -m dynamo.vllm --multimodal-worker --enable-multimodal --model $MODEL_NAME $EXTRA_ARGS &
CUDA_VISIBLE_DEVICES=0 python -m dynamo.vllm --multimodal-worker --enable-multimodal --enable-mm-embeds --model $MODEL_NAME $EXTRA_ARGS &

# Wait for all background processes to complete
wait
15 changes: 6 additions & 9 deletions examples/backends/vllm/launch/disagg_multimodal_epd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,20 @@ python -m dynamo.vllm --multimodal-processor --enable-multimodal --model $MODEL_

# Configure GPU memory optimization for specific models
EXTRA_ARGS=""
if [[ "$MODEL_NAME" == "Qwen/Qwen2.5-VL-7B-Instruct" ]]; then
EXTRA_ARGS="--gpu-memory-utilization 0.85 --max-model-len 2048"
fi

# Start encode worker
echo "Starting encode worker on GPU 1..."
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 CUDA_VISIBLE_DEVICES=1 python -m dynamo.vllm --multimodal-encode-worker --enable-multimodal --model $MODEL_NAME $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20080"}' &
echo "Starting encode worker on GPU 0..."
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 CUDA_VISIBLE_DEVICES=0 python -m dynamo.vllm --multimodal-encode-worker --enable-multimodal --model $MODEL_NAME $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20080"}' &

# Start prefill worker
echo "Starting prefill worker on GPU 2..."
echo "Starting prefill worker on GPU 1..."
VLLM_NIXL_SIDE_CHANNEL_PORT=20098 \
CUDA_VISIBLE_DEVICES=2 python -m dynamo.vllm --multimodal-worker --is-prefill-worker --enable-multimodal --model $MODEL_NAME $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20081"}' &
CUDA_VISIBLE_DEVICES=1 python -m dynamo.vllm --multimodal-worker --is-prefill-worker --enable-multimodal --enable-mm-embeds --model $MODEL_NAME $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20081"}' &

# Start decode worker
echo "Starting decode worker on GPU 3..."
echo "Starting decode worker on GPU 2..."
VLLM_NIXL_SIDE_CHANNEL_PORT=20099 \
CUDA_VISIBLE_DEVICES=3 python -m dynamo.vllm --multimodal-decode-worker --enable-multimodal --model $MODEL_NAME $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20082"}' &
CUDA_VISIBLE_DEVICES=2 python -m dynamo.vllm --multimodal-decode-worker --enable-multimodal --model $MODEL_NAME $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20082"}' &

echo "=================================================="
echo "All components started. Waiting for initialization..."
Expand Down
3 changes: 1 addition & 2 deletions examples/multimodal/components/audio_encode_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import uvloop
from transformers import AutoProcessor, Qwen2AudioForConditionalGeneration
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.utils import FlexibleArgumentParser
from vllm.utils.argparse_utils import FlexibleArgumentParser

import dynamo.nixl_connect as connect
from dynamo.runtime import Client, DistributedRuntime, dynamo_worker
Expand Down Expand Up @@ -201,7 +201,6 @@ async def async_init(self, runtime: DistributedRuntime):
# Create and initialize a dynamo connector for this worker.
# We'll needs this to move data between this worker and remote workers efficiently.
self._connector = connect.Connector()
await self._connector.initialize()

logger.info("Startup completed.")

Expand Down
2 changes: 1 addition & 1 deletion examples/multimodal/components/encode_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import uvloop
from transformers import AutoImageProcessor
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.utils import FlexibleArgumentParser
from vllm.utils.argparse_utils import FlexibleArgumentParser

import dynamo.nixl_connect as connect
from dynamo.runtime import Client, DistributedRuntime, dynamo_worker
Expand Down
4 changes: 2 additions & 2 deletions examples/multimodal/components/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.entrypoints.openai.protocol import ChatCompletionRequest, CompletionRequest
from vllm.outputs import RequestOutput
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.utils import FlexibleArgumentParser
from vllm.tokenizers import TokenizerLike as AnyTokenizer
from vllm.utils.argparse_utils import FlexibleArgumentParser

from dynamo.llm import ModelInput, ModelType, register_llm
from dynamo.runtime import Client, DistributedRuntime, dynamo_worker
Expand Down
4 changes: 4 additions & 0 deletions examples/multimodal/components/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def record(
scheduler_stats: Optional[SchedulerStats],
iteration_stats: Optional[IterationStats],
engine_idx: int = 0,
*args,
**kwargs,
):
pass

Expand Down Expand Up @@ -74,6 +76,8 @@ def record(
scheduler_stats: SchedulerStats,
iteration_stats: Optional[IterationStats],
engine_idx: int = 0,
*args,
**kwargs,
):
# request_total_slots and kv_total_blocks are properties of model + gpu
# we should only publish them once, not every metric update
Expand Down
2 changes: 1 addition & 1 deletion examples/multimodal/components/video_encode_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import torch
import uvloop
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.utils import FlexibleArgumentParser
from vllm.utils.argparse_utils import FlexibleArgumentParser

import dynamo.nixl_connect as connect
from dynamo.runtime import Client, DistributedRuntime, dynamo_worker
Expand Down
3 changes: 1 addition & 2 deletions examples/multimodal/components/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from vllm.distributed.kv_events import ZmqEventPublisher
from vllm.inputs.data import TokensPrompt
from vllm.usage.usage_lib import UsageContext
from vllm.utils import FlexibleArgumentParser
from vllm.utils.argparse_utils import FlexibleArgumentParser
from vllm.v1.engine.async_llm import AsyncLLM

import dynamo.nixl_connect as connect
Expand Down Expand Up @@ -251,7 +251,6 @@ async def async_init(self, runtime: DistributedRuntime):
# We'll needs this to move data between this worker and remote workers efficiently.
parsed_namespace, _, _ = parse_endpoint(self.endpoint)
self._connector = connect.Connector()
await self._connector.initialize()

self.image_loader = ImageLoader()

Expand Down
2 changes: 1 addition & 1 deletion examples/multimodal/launch/audio_agg.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ python3 components/processor.py --model $MODEL_NAME --prompt-template "$PROMPT_T

# run E/P/D workers
CUDA_VISIBLE_DEVICES=0 python3 components/audio_encode_worker.py --model $MODEL_NAME &
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 CUDA_VISIBLE_DEVICES=1 python3 components/worker.py --model $MODEL_NAME --worker-type prefill &
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 CUDA_VISIBLE_DEVICES=0 python3 components/worker.py --model $MODEL_NAME --worker-type prefill &

# Wait for all background processes to complete
wait
2 changes: 2 additions & 0 deletions examples/multimodal/utils/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ def overwrite_args(config):
"enable_prefix_caching": True,
# KV routing relies on logging KV metrics
"disable_log_stats": False,
# Enable multimodal embeddings input
"enable_mm_embeds": True,
# Always setting up kv transfer for disagg
"kv_transfer_config": KVTransferConfig(
kv_connector="NixlConnector", kv_role="kv_both"
Expand Down
45 changes: 35 additions & 10 deletions examples/multimodal/utils/chat_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,22 @@
from vllm.entrypoints.openai.serving_chat import OpenAIServingChat
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
from vllm.entrypoints.openai.serving_engine import RequestPrompt
from vllm.entrypoints.openai.serving_models import BaseModelPath, OpenAIServingModels
from vllm.inputs.data import TokensPrompt
from vllm.sampling_params import SamplingParams
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.tokenizers import TokenizerLike as AnyTokenizer


class StubEngineClient:
"""
Stub EngineClient for preprocessing-only use of OpenAIServingChat/Completion.
Provides the minimal attributes required by OpenAIServingModels.
"""

def __init__(self, model_config: ModelConfig):
self.model_config = model_config
self.input_processor = None
self.io_processor = None


@runtime_checkable
Expand Down Expand Up @@ -120,12 +133,19 @@ class ChatProcessor:
def __init__(self, tokenizer: AnyTokenizer, model_config: ModelConfig):
self.tokenizer = tokenizer
self.model_config = model_config
# Create stub engine client and models for preprocessing-only usage
stub_engine = StubEngineClient(model_config)
serving_models = OpenAIServingModels(
engine_client=stub_engine,
base_model_paths=[
BaseModelPath(name=model_config.model, model_path=model_config.model)
],
)
self.openai_serving = OpenAIServingChat(
engine_client=None,
model_config=model_config,
models=None,
request_logger=None,
engine_client=stub_engine,
models=serving_models,
response_role="assistant",
request_logger=None,
chat_template=None,
chat_template_content_format="auto",
)
Expand Down Expand Up @@ -186,7 +206,6 @@ async def stream_response(
conversation,
self.tokenizer,
request_metadata,
enable_force_include_usage=False,
):
if raw_response.startswith("data: [DONE]"):
yield raw_response
Expand Down Expand Up @@ -220,7 +239,6 @@ async def stream_response(
conversation,
self.tokenizer,
request_metadata,
enable_force_include_usage=False,
):
if raw_response.startswith("data: [DONE]"):
break
Expand Down Expand Up @@ -267,10 +285,17 @@ class CompletionsProcessor:
def __init__(self, tokenizer: AnyTokenizer, model_config: ModelConfig):
self.tokenizer = tokenizer
self.model_config = model_config
# Create stub engine client and models for preprocessing-only usage
stub_engine = StubEngineClient(model_config)
serving_models = OpenAIServingModels(
engine_client=stub_engine,
base_model_paths=[
BaseModelPath(name=model_config.model, model_path=model_config.model)
],
)
self.openai_serving = OpenAIServingCompletion(
engine_client=None,
model_config=model_config,
models=None,
engine_client=stub_engine,
models=serving_models,
request_logger=None,
)

Expand Down
6 changes: 3 additions & 3 deletions examples/multimodal/utils/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from vllm.multimodal.inputs import MultiModalUUIDDict # noqa: F401
from vllm.outputs import CompletionOutput
from vllm.sampling_params import SamplingParams
from vllm.sequence import RequestMetrics
from vllm.v1.metrics.stats import RequestStateStats

import dynamo.nixl_connect as connect

Expand Down Expand Up @@ -166,7 +166,7 @@ class MyRequestOutput(BaseModel):
https://github.com/vllm-project/vllm/blob/a4c402a756fa3213caf9d2cde0e4ceb2d57727f2/vllm/outputs.py#L85

This class is used to serialize the RequestOutput and any recursively defined types
We can do this because PromptLogprobs, RequestMetrics, and CompletionOutput are all serializable dataclasses
We can do this because PromptLogprobs, RequestStateStats, and CompletionOutput are all serializable dataclasses
"""

model_config = ConfigDict(arbitrary_types_allowed=True)
Expand All @@ -177,7 +177,7 @@ class MyRequestOutput(BaseModel):
prompt_logprobs: Optional[PromptLogprobs] = None
outputs: List[CompletionOutput]
finished: bool
metrics: Optional[RequestMetrics] = None
metrics: Optional[RequestStateStats] = None
kv_transfer_params: Optional[dict[str, Any]] = None
# lora_request: Optional[LoRARequest] = None
# encoder_prompt: Optional[str] = None
Expand Down
Loading