diff --git a/02_service_hello_world/query.py b/02_service_hello_world/query.py index ddb83cb..905ea58 100644 --- a/02_service_hello_world/query.py +++ b/02_service_hello_world/query.py @@ -1,3 +1,5 @@ +import os +from urllib.parse import urljoin import requests # The "anyscale service deploy" script outputs a line that looks like @@ -9,7 +11,7 @@ base_url = # Fill this in. resp = requests.get( - f"{base_url}/hello", + urljoin(base_url, "hello"), params={"name": "Theodore"}, headers={"Authorization": f"Bearer {token}"}) diff --git a/03_deploy_llama_3_8b/Dockerfile b/03_deploy_llama_3_8b/Dockerfile new file mode 100644 index 0000000..7c4782e --- /dev/null +++ b/03_deploy_llama_3_8b/Dockerfile @@ -0,0 +1,8 @@ +FROM anyscale/ray:2.49.0-slim-py312-cu128 + +# C compiler for Triton’s runtime build step (vLLM V1 engine) +# https://github.com/vllm-project/vllm/issues/2997 +RUN sudo apt-get update && \ + sudo apt-get install -y --no-install-recommends build-essential + +RUN pip install vllm==0.10.0 diff --git a/03_deploy_llama_3_8b/README.md b/03_deploy_llama_3_8b/README.md new file mode 100644 index 0000000..d69fa1d --- /dev/null +++ b/03_deploy_llama_3_8b/README.md @@ -0,0 +1,64 @@ +# Deploy Llama 3.1 8b + +This example uses Ray Serve along with vLLM to deploy a Llama 3.1 8b model as an Anyscale service. + +## Install the Anyscale CLI + +```bash +pip install -U anyscale +anyscale login +``` + +## Deploy the service + +Clone the example from GitHub. + +```bash +git clone https://github.com/anyscale/examples.git +cd examples/03_deploy_llama_3_8b +``` + +Deploy the service. Use `--env` to forward your Hugging Face token if you need authentication for gated models like Llama 3. + +```bash +export HF_TOKEN= +anyscale service deploy -f service.yaml --env HF_TOKEN=$HF_TOKEN +``` + +If you’re using an ungated model, go to your `LLMConfig` (in `serve_llama_3_1_8b.py`), and set `model_source` to that model. Then, you can omit the Hugging Face token from both the config and the `anyscale service deploy` command. + +## Understanding the example + +- The [application code](https://github.com/anyscale/examples/blob/main/03_deploy_llama_3_8b/serve_llama_3_1_8b.py) sets the required accelerator type with `accelerator_type="L4"`. To use a different accelerator, replace `"L4"` with the desired name. See the [list of supported accelerators](https://docs.ray.io/en/latest/ray-core/accelerator-types.html#accelerator-types) for available options. +- Ray Serve automatically autoscales the number of model replicas between `min_replicas` and `max_replicas`. Ray Serve adapts the number of replicas by monitoring queue sizes. For more information on configuring autoscaling, see the [AutoscalingConfig documentation](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.config.AutoscalingConfig.html). +- This example uses vLLM, and the [Dockerfile](https://github.com/anyscale/examples/blob/main/03_deploy_llama_3_8b/Dockerfile) defines the service’s dependencies. When you run `anyscale service deploy`, the build process adds these dependencies on top of an Anyscale-provided base image. +- To configure vLLM, modify the `engine_kwargs` dictionary. See [Ray documentation for the `LLMConfig` object](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.llm.LLMConfig.html#ray.serve.llm.LLMConfig). + + +## Query the service + +The `anyscale service deploy` command outputs a line that looks like +```text +curl -H "Authorization: Bearer " +``` + +From the output, you can extract the service token and base URL. Open [query.py](https://github.com/anyscale/examples/blob/main/03_deploy_llama_3_8b/query.py) and add them to the appropriate fields. +```python +token = +base_url = +``` + +Query the model +```bash +pip install openai +python query.py +``` + +View the service in the [services tab](https://console.anyscale.com/services) of the Anyscale console. + +## Shutdown + +Shutdown your Anyscale Service: +```bash +anyscale service terminate -n deploy-llama-3-1-8b +``` \ No newline at end of file diff --git a/03_deploy_llama_3_8b/query.py b/03_deploy_llama_3_8b/query.py new file mode 100644 index 0000000..74aaae8 --- /dev/null +++ b/03_deploy_llama_3_8b/query.py @@ -0,0 +1,26 @@ +from urllib.parse import urljoin +from openai import OpenAI + +# The "anyscale service deploy" script outputs a line that looks like +# +# curl -H "Authorization: Bearer " +# +# From this, you can parse out the service token and base URL. +token = # Fill this in. If deploying and querying locally, use token = "FAKE_KEY" +base_url = # Fill this in. If deploying and querying locally, use base_url = "http://localhost:8000" + +client = OpenAI(base_url= urljoin(base_url, "v1"), api_key=token) + +response = client.chat.completions.create( + model="my-llama-3.1-8B", + messages=[ + {"role": "user", "content": "What's the capital of France?"} + ], + stream=True +) + +# Stream and print JSON +for chunk in response: + data = chunk.choices[0].delta.content + if data: + print(data, end="", flush=True) diff --git a/03_deploy_llama_3_8b/serve_llama_3_1_8b.py b/03_deploy_llama_3_8b/serve_llama_3_1_8b.py new file mode 100644 index 0000000..f1ab1a5 --- /dev/null +++ b/03_deploy_llama_3_8b/serve_llama_3_1_8b.py @@ -0,0 +1,32 @@ +from ray import serve +from ray.serve.llm import LLMConfig, build_openai_app +import os + +llm_config = LLMConfig( + model_loading_config=dict( + model_id="my-llama-3.1-8B", + # Or unsloth/Meta-Llama-3.1-8B-Instruct for an ungated version + model_source="meta-llama/Llama-3.1-8B-Instruct", + ), + accelerator_type="L4", + deployment_config=dict( + autoscaling_config=dict( + min_replicas=1, max_replicas=2, + ) + ), + # We need to share our Hugging Face token with the workers so they can access the gated model. + # If your model is not gated, you can skip this. + runtime_env=dict( + env_vars={ + "HF_TOKEN": os.environ["HF_TOKEN"] + } + ), + engine_kwargs=dict( + max_model_len=8192, + ) +) + +app = build_openai_app({"llm_configs": [llm_config]}) + +# Uncomment the below line to run the service locally with Python. +# serve.run(app, blocking=True) diff --git a/03_deploy_llama_3_8b/service.yaml b/03_deploy_llama_3_8b/service.yaml new file mode 100644 index 0000000..35e4a91 --- /dev/null +++ b/03_deploy_llama_3_8b/service.yaml @@ -0,0 +1,41 @@ +# View the docs https://docs.anyscale.com/reference/service-api#serviceconfig. + +name: deploy-llama-3-1-8b + +# When empty, use the default image. This can be an Anyscale-provided base image +# like anyscale/ray:2.43.0-slim-py312-cu125, a user-provided base image (provided +# that it meets certain specs), or you can build new images using the Anyscale +# image builder at https://console.anyscale-staging.com/v2/container-images. + +containerfile: ./Dockerfile + +# When empty, Anyscale will auto-select the instance types. You can also specify +# minimum and maximum resources. +compute_config: +# head_node: +# instance_type: m5.2xlarge +# worker_nodes: +# - instance_type: m5.16xlarge +# min_nodes: 0 +# max_nodes: 100 +# - instance_type: m7a.24xlarge +# min_nodes: 0 +# max_nodes: 100 +# market_type: PREFER_SPOT # Defaults to ON_DEMAND +# - instance_type: g4dn.2xlarge +# min_nodes: 0 +# max_nodes: 100 +# market_type: PREFER_SPOT # Defaults to ON_DEMAND + auto_select_worker_config: true + +# Path to a local directory or a remote URI to a .zip file (S3, GS, HTTP) that +# will be the working directory for the job. The files in the directory will be +# automatically uploaded to the job environment in Anyscale. +working_dir: . + +# When empty, this uses the default Anyscale Cloud in your organization. +cloud: + +# Specify the Ray Serve app to deploy. +applications: +- import_path: serve_llama_3_1_8b:app diff --git a/deploy_llama_3_1_70b/Dockerfile b/deploy_llama_3_1_70b/Dockerfile new file mode 100644 index 0000000..ea0023c --- /dev/null +++ b/deploy_llama_3_1_70b/Dockerfile @@ -0,0 +1,8 @@ +FROM anyscale/ray:2.50.0-slim-py312-cu128 + +# C compiler for Triton’s runtime build step (vLLM V1 engine) +# https://github.com/vllm-project/vllm/issues/2997 +RUN sudo apt-get update && \ + sudo apt-get install -y --no-install-recommends build-essential + +RUN pip install vllm==0.11.0 diff --git a/deploy_llama_3_1_70b/README.md b/deploy_llama_3_1_70b/README.md new file mode 100644 index 0000000..155fb3a --- /dev/null +++ b/deploy_llama_3_1_70b/README.md @@ -0,0 +1,63 @@ +# Deploy Llama 3.1 70b + +This example uses Ray Serve along with vLLM to deploy a Llama 3.1 70b model as an Anyscale service. The same code can be used for similarly sized models. + +## Install the Anyscale CLI + +```bash +pip install -U anyscale +anyscale login +``` + +## Deploy the service + +Clone the example from GitHub. + +```bash +git clone https://github.com/anyscale/examples.git +cd examples/deploy_llama_3_1_70b +``` + +Deploy the service. Use `--env` to forward your Hugging Face token if you need authentication for gated models like Llama 3. + +```bash +anyscale service deploy -f service.yaml --env HF_TOKEN=${HF_TOKEN:?HF_TOKEN is not set} +``` + +The logic in `${HF_TOKEN:?HF_TOKEN is not set}` just raises an error if no Hugging Face token is present. If you don't have a Hugging Face token, you can use one of the ungated models (change `model_name` in [serve.py](https://github.com/anyscale/examples/blob/main/deploy_llama_3_1_70b/serve.py)). Not only do the Llama models require a Hugging Face token, you also need to request permission to use the models ([here for 3.1](https://huggingface.co/meta-llama/Llama-3.1-70B-Instruct) and [here for 3.3](https://huggingface.co/meta-llama/Llama-3.3-70B-Instruct)). + +## Understanding the example + +- The [application code](https://github.com/anyscale/examples/blob/main/deploy_llama_3_1_70b/serve.py) sets the required accelerator type with `accelerator_type="L40S"`. This accelerator type is available on AWS. On other clouds, use an accelerator type like `"A100"` or `"H100"`. See the [list of supported accelerators](https://docs.ray.io/en/latest/ray-core/accelerator-types.html#accelerator-types) for available options. Depending on the accelerator type that you use, will will also need to select the appropriate instance types in [service.yaml](https://github.com/anyscale/examples/blob/main/deploy_llama_3_1_70b/service.yaml). +- Ray Serve automatically autoscales the number of model replicas between `min_replicas` and `max_replicas`. Ray Serve adapts the number of replicas by monitoring queue sizes. For more information on configuring autoscaling, see the [AutoscalingConfig documentation](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.config.AutoscalingConfig.html). +- This example uses vLLM, and the [Dockerfile](https://github.com/anyscale/examples/blob/main/deploy_llama_3_1_70b/Dockerfile) defines the service’s dependencies. When you run `anyscale service deploy`, the build process adds these dependencies on top of an Anyscale-provided base image. +- To configure vLLM, modify the `engine_kwargs` dictionary. See [Ray documentation for the `LLMConfig` object](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.llm.LLMConfig.html#ray.serve.llm.LLMConfig). + + +## Query the service + +The `anyscale service deploy` command outputs a line that looks like +```text +curl -H "Authorization: Bearer " +``` + +From the output, you can extract the service token and base URL. Open [query.py](https://github.com/anyscale/examples/blob/main/deploy_llama_3_1_70b/query.py) and add them to the appropriate fields. +```python +token = +base_url = +``` + +Query the model +```bash +pip install openai +python query.py +``` + +View the service in the [services tab](https://console.anyscale.com/services) of the Anyscale console. + +## Shutdown + +Shutdown your Anyscale Service: +```bash +anyscale service terminate -n deploy-70b +``` \ No newline at end of file diff --git a/deploy_llama_3_1_70b/query.py b/deploy_llama_3_1_70b/query.py new file mode 100644 index 0000000..334423a --- /dev/null +++ b/deploy_llama_3_1_70b/query.py @@ -0,0 +1,26 @@ +from urllib.parse import urljoin +from openai import OpenAI + +# The "anyscale service deploy" script outputs a line that looks like +# +# curl -H "Authorization: Bearer " +# +# From this, you can parse out the service token and base URL. +token = # Fill this in. If deploying and querying locally, use token = "FAKE_KEY" +base_url = # Fill this in. If deploying and querying locally, use base_url = "http://localhost:8000" + +client = OpenAI(base_url= urljoin(base_url, "v1"), api_key=token) + +response = client.chat.completions.create( + model="my-70b-model", + messages=[ + {"role": "user", "content": "What's the capital of France?"} + ], + stream=True +) + +# Stream and print the response. +for chunk in response: + data = chunk.choices[0].delta.content + if data: + print(data, end="", flush=True) diff --git a/deploy_llama_3_1_70b/serve.py b/deploy_llama_3_1_70b/serve.py new file mode 100644 index 0000000..8cbfc54 --- /dev/null +++ b/deploy_llama_3_1_70b/serve.py @@ -0,0 +1,37 @@ +from ray.serve.llm import LLMConfig, build_openai_app +import os + +# model_name = "meta-llama/Llama-3.1-70B-Instruct" +# model_name = "meta-llama/Llama-3.3-70B-Instruct" +# model_name = "unsloth/Meta-Llama-3.1-70B-Instruct" # Ungated, no token required +model_name = "deepseek-ai/DeepSeek-R1-Distill-Llama-70B" # Ungated, no token required + +llm_config = LLMConfig( + model_loading_config=dict( + model_id="my-70b-model", + model_source=model_name, + ), + # Valid types (depending on what GPUs are available on the cloud) include "L40S", "A100", and "H100". + # If you use a cloud other than AWS, in addition to changing the accelerator type, you also need to + # change the compute_config in service.yaml. + accelerator_type="L40S", + deployment_config=dict( + autoscaling_config=dict( + min_replicas=1, + max_replicas=4, + ) + ), + ### If your model is not gated, you can skip `HF_TOKEN` + # Share your Hugging Face token with the vllm engine so it can access the gated Llama 3. + # Type `export HF_TOKEN=` in a terminal + engine_kwargs=dict( + max_model_len=32768, + # Split weights among 8 GPUs in the node + tensor_parallel_size=8, + ), +) + +app = build_openai_app({"llm_configs": [llm_config]}) + +# Uncomment the below line to run the service locally with Python. +# serve.run(app, blocking=True) diff --git a/deploy_llama_3_1_70b/service.yaml b/deploy_llama_3_1_70b/service.yaml new file mode 100644 index 0000000..f80aeed --- /dev/null +++ b/deploy_llama_3_1_70b/service.yaml @@ -0,0 +1,57 @@ +# View the docs https://docs.anyscale.com/reference/service-api#serviceconfig. + +name: deploy-70b + +# When empty, use the default image. This can be an Anyscale-provided base image +# like anyscale/ray:2.49.2-slim-py312-cu128, a user-provided base image (provided +# that it meets certain specs), or you can build new images using the Anyscale +# image builder at https://console.anyscale-staging.com/v2/container-images. + +containerfile: ./Dockerfile + +# Anyscale will auto-select the instance types, but you can also specify the instance +# types manually. Different GPU types are available on different clouds. +compute_config: +# head_node: +# instance_type: m5.2xlarge +# worker_nodes: +# # These instances are only available in AWS. +# - instance_type: p4d.24xlarge +# min_nodes: 0 +# max_nodes: 1 +# market_type: PREFER_SPOT # Defaults to ON_DEMAND +# - instance_type: p4de.24xlarge +# min_nodes: 0 +# max_nodes: 1 +# market_type: PREFER_SPOT # Defaults to ON_DEMAND +# - instance_type: p5.48xlarge +# min_nodes: 0 +# max_nodes: 1 +# market_type: PREFER_SPOT # Defaults to ON_DEMAND +# +# # These instances are only available in GCP. +# - instance_type: a2-highgpu-8g-nvidia-a100-40gb-8 +# market_type: PREFER_SPOT +# - instance_type: a2-ultragpu-8g-nvidia-a100-80gb-8 +# market_type: PREFER_SPOT +# - instance_type: a2-megagpu-16g-nvidia-a100-40gb-16 +# market_type: PREFER_SPOT +# - instance_type: a3-highgpu-8g-nvidia-h100-80gb-8 +# market_type: PREFER_SPOT +# - instance_type: a3-megagpu-8g-nvidia-h100-mega-80gb-8 +# market_type: PREFER_SPOT +# - instance_type: a3-ultragpu-8g-nvidia-h200-141gb-8 +# market_type: PREFER_SPOT + auto_select_worker_config: true + +# Path to a local directory or a remote URI to a .zip file (S3, GS, HTTP) that +# will be the working directory for the job. The files in the directory will be +# automatically uploaded to the job environment in Anyscale. +working_dir: . + +# When empty, this uses the default Anyscale Cloud in your organization. +cloud: + +# Specify the Ray Serve app to deploy. +applications: +- import_path: serve:app diff --git a/image_processing/Dockerfile b/image_processing/Dockerfile new file mode 100644 index 0000000..77fc115 --- /dev/null +++ b/image_processing/Dockerfile @@ -0,0 +1,14 @@ +FROM anyscale/ray:2.52.0-slim-py312-cu128 + +# C compiler for Triton’s runtime build step (vLLM V1 engine) +# https://github.com/vllm-project/vllm/issues/2997 +RUN sudo apt-get update && \ + sudo apt-get install -y --no-install-recommends build-essential + +RUN curl -LsSf https://astral.sh/uv/install.sh | sh + +RUN uv pip install --system huggingface_hub boto3 + +RUN uv pip install --system vllm==0.11.2 + +RUN uv pip install --system transformers==4.57.1 diff --git a/image_processing/README.md b/image_processing/README.md new file mode 100644 index 0000000..c71f0cc --- /dev/null +++ b/image_processing/README.md @@ -0,0 +1,44 @@ +# Large-Scale Image Processing with Vision Language Models + +This example demonstrates how to build an image processing pipeline that scales to billions of images using Ray Data and vLLM on Anyscale. We process the [ReLAION-2B dataset](https://huggingface.co/datasets/laion/relaion2B-en-research-safe), which contains over 2 billion image URLs with associated metadata. + +You'll need a HuggingFace token to access the ReLAION-2B dataset. Get one at [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens). + + +## Install the Anyscale CLI + +```bash +pip install -U anyscale +anyscale login +``` + + +## Submit the job. + +Clone the example from GitHub. + +```bash +git clone https://github.com/anyscale/examples.git +cd examples/image_processing +``` + +Submit the job. Use `--env` to forward your Hugging Face token to authenticate with Hugging Face. + +```bash +anyscale job submit -f job.yaml --env HF_TOKEN=$HF_TOKEN +``` + +Results will be written to `/mnt/shared_storage/process_images_output/{timestamp}/` in the Parquet format. + + +## Understanding the example + +- The pipeline performs three main stages on each image: + - **Image Download**: Download images from URLs in a multi-threaded manner handling timeouts and invalid URLs. + - **Image Preprocessing**: Validate, resize, and standardize images, filtering out corrupted or invalid images. + - **Vision Model Inference**: Run the [Qwen2.5-VL-3B-Instruct](https://huggingface.co/Qwen/Qwen2.5-VL-3B-Instruct) vision-language model using vLLM to generate a caption for each image. +- The entire pipeline is orchestrated by [Ray Data](https://docs.ray.io/en/latest/data/data.html), which handles distributed execution, fault tolerance, and resource management across your cluster. +- This example uses [Ray Data's native vLLM integration](https://docs.ray.io/en/latest/data/working-with-llms.html) to optimize vLLM for throughput and performa batch inference in the overall pipeline. +- Some notes on configuration. + - This example passes `concurrency=10` into `ray.data.read_parquet` in order to reduce the likelihood of hitting Hugging Face rate limits. + - This example calls `repartition(target_num_rows_per_block=1000)` after the `read_parquet` call. The blocks created by `read_parquet` can consist of millions of rows because each row consists of small URL data. Ray Data processes a single block sequentially (one batch at a time). The `repartition` call creates smaller blocks from the larger block which is important both to increase the degree of parallelism and to reduce the memory required to process each block. diff --git a/image_processing/job.yaml b/image_processing/job.yaml new file mode 100644 index 0000000..daf1bdd --- /dev/null +++ b/image_processing/job.yaml @@ -0,0 +1,43 @@ +# View the docs https://docs.anyscale.com/reference/job-api#jobconfig. +name: process-images + +# When empty, use the default image. This can be an Anyscale-provided base image +# like anyscale/ray:2.43.0-slim-py312-cu125, a user-provided base image (provided +# that it meets certain specs), or you can build new images using the Anyscale +# image builder at https://console.anyscale-staging.com/v2/container-images. +# image_uri: # anyscale/ray:2.43.0-slim-py312-cu125 +containerfile: ./Dockerfile + +compute_config: + auto_select_worker_config: true + # worker_nodes: + # - instance_type: g5.12xlarge + # min_nodes: 0 + # max_nodes: 16 + # - instance_type: m5.2xlarge + # min_nodes: 0 + # max_nodes: 16 + max_resources: + CPU: 1600 + GPU: 64 + +# Path to a local directory or a remote URI to a .zip file (S3, GS, HTTP) that +# will be the working directory for the job. The files in the directory will be +# automatically uploaded to the job environment in Anyscale. +working_dir: . + +# When empty, this uses the default Anyscale Cloud in your organization. +cloud: + +env_vars: + RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION: "0.5" + +# The script to run in your job. You can also do "uv run main.py" if you have a +# pyproject.toml file in your working_dir. +entrypoint: python process_images.py + +# If there is an error, do not retry. +max_retries: 0 + +# Kill the job after 2 hours to control costs. +timeout_s: 7200 diff --git a/image_processing/process_images.py b/image_processing/process_images.py new file mode 100644 index 0000000..a669f6e --- /dev/null +++ b/image_processing/process_images.py @@ -0,0 +1,164 @@ +import os +import ray +from huggingface_hub import HfFileSystem +from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor +from PIL import Image +from io import BytesIO +import requests +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from typing import Optional, Dict, Any, List + +num_images_to_process = 10**6 +num_gpus = 64 + +timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") +output_path = f"/mnt/shared_storage/process_images_output/{timestamp}" + + +def download_single_image(url: str, session: requests.Session) -> Dict[str, Any]: + """Download a single image.""" + try: + # Use the provided session for connection pooling + response = session.get(url, timeout=5, stream=True) + + if response.status_code == 200: + # Read content + content = response.content + return {"content": content, "status": "success", "url": url} + else: + # Return HTTP status code for non-200 responses + return { + "content": None, + "status": f"http_{response.status_code}", + "url": url, + } + + except Exception as e: + return {"content": None, "status": f"error_{type(e).__name__}", "url": url} + + +def image_download(batch: Dict[str, List]) -> Dict[str, List]: + """Download a batch of images using a thread pool for parallelism.""" + urls = batch["url"] + + # Create a session for connection pooling + session = requests.Session() + adapter = requests.adapters.HTTPAdapter( + pool_connections=100, + pool_maxsize=100, + max_retries=0, # No automatic retries + ) + session.mount("http://", adapter) + session.mount("https://", adapter) + + # Use ThreadPoolExecutor for parallel downloads + with ThreadPoolExecutor(max_workers=50) as executor: + results = list(executor.map(lambda url: download_single_image(url, session), urls)) + + batch["bytes"] = [r["content"] for r in results] + return batch + + +def process_single_image(row: Dict[str, Any]) -> Dict[str, Any]: + """Process a single image: validate, convert to RGB, and resize.""" + image_bytes = row["bytes"] + if image_bytes is None: + return row + + try: + img = Image.open(BytesIO(image_bytes)) + img.load() + if img.mode != "RGB": + img = img.convert("RGB") + img = img.resize((128, 128), Image.Resampling.LANCZOS) + output_buffer = BytesIO() + img.save(output_buffer, format="JPEG", quality=95) + row["bytes"] = output_buffer.getvalue() + except Exception: + row["bytes"] = None + return row + + +vision_processor_config = vLLMEngineProcessorConfig( + model_source="Qwen/Qwen2.5-VL-3B-Instruct", + engine_kwargs=dict( + tensor_parallel_size=1, + pipeline_parallel_size=1, + max_model_len=4096, + enable_chunked_prefill=True, + max_num_batched_tokens=8192, + distributed_executor_backend="mp", + gpu_memory_utilization=0.95, + ), + runtime_env=dict( + env_vars=dict( + VLLM_USE_V1="1", + VLLM_DISABLE_COMPILE_CACHE="1", + ), + ), + batch_size=64, + max_concurrent_batches=8, + accelerator_type="A10G", + concurrency=num_gpus, + has_image=True, +) + + +def vision_preprocess(row): + image_bytes = row["bytes"] + return dict( + messages=[ + { + "role": "user", + "content": [ + { + "type": "image", + "image": Image.open(BytesIO(image_bytes)), + }, + ], + }, + ], + sampling_params=dict( + temperature=0.3, + max_tokens=150, + detokenize=False, + ), + ) + + +def vision_postprocess(row): + row.pop("bytes") + return row + + +vision_processor = build_llm_processor( + vision_processor_config, + preprocess=vision_preprocess, + postprocess=vision_postprocess, +) + +dataset = ( + ray.data.read_parquet( + "hf://datasets/laion/relaion2B-en-research-safe/", + file_extensions=["parquet"], + columns=["url"], + filesystem=HfFileSystem(token=os.environ["HF_TOKEN"]), + concurrency=10, + ray_remote_args={"memory": int(4 * 10**9)}, + ) + .limit(num_images_to_process) + .repartition(target_num_rows_per_block=1000) + .map_batches( + image_download, + batch_size=100, + memory=(10**9), + ) + .drop_columns(["url"]) + .map(process_single_image) + .filter(lambda row: row["bytes"] is not None) # Filter out failed downloads/processing +) + +dataset = vision_processor(dataset) + +dataset.write_parquet(output_path) diff --git a/nemo_curator_semantic_dedup/Dockerfile b/nemo_curator_semantic_dedup/Dockerfile new file mode 100644 index 0000000..0c8666e --- /dev/null +++ b/nemo_curator_semantic_dedup/Dockerfile @@ -0,0 +1,66 @@ +# NeMo Curator Image Deduplication Example +# Uses CUDA 12.8 for GPU-accelerated processing +FROM anyscale/ray:2.54.0-slim-py312-cu128 + +# Note: Cache busting for git clone is done via CURATOR_CACHE_BUST arg below + +# Install system dependencies +RUN sudo apt-get update && \ + sudo apt-get install -y --no-install-recommends \ + build-essential \ + unzip \ + wget \ + curl \ + git && \ + sudo apt-get clean && \ + sudo rm -rf /var/lib/apt/lists/* + +# Install uv for fast package management +RUN curl -LsSf https://astral.sh/uv/install.sh | sh + +# Install Python dependencies +# Use uv pip install --system to install into the base anaconda environment +# so all Ray workers (not just the driver) have these packages +RUN python -m pip install --upgrade pip setuptools wheel + +# IMPORTANT: Uninstall any pre-existing RAPIDS/cuML packages from the base image +# The base image may have incompatible versions that conflict with scikit-learn +RUN python -m pip uninstall -y cuml-cu12 cudf-cu12 cugraph-cu12 pylibraft-cu12 raft-dask-cu12 rmm-cu12 || true && \ + echo "Cleaned up pre-existing RAPIDS packages" + +# Install NeMo-Curator's native CUDA dependencies (DALI, cuML, RAFT, etc.) +# Only the compiled/binary deps matter here — the nemo_curator Python package +# itself is overridden at runtime via PYTHONPATH=Curator in job.yaml, which +# points to the local Curator/ directory uploaded with the working_dir. +ARG CURATOR_REPO=https://github.com/NVIDIA-NeMo/Curator.git +ARG CURATOR_REF=main +ARG CURATOR_CACHE_BUST=2026-03-21-v1 +RUN echo "Cache bust: ${CURATOR_CACHE_BUST}" && \ + git clone --depth 1 -b ${CURATOR_REF} ${CURATOR_REPO} /home/ray/NeMo-Curator && \ + uv pip install --system -e /home/ray/NeMo-Curator[image_cuda12] + +# Re-upgrade scikit-learn AFTER nemo-curator in case it was downgraded +# cuML 25.6.* needs sklearn >= 1.5 (has _get_default_requests) +RUN uv pip install --system "scikit-learn>=1.5,<1.6" && \ + python -c "import sklearn; print(f'Final scikit-learn version: {sklearn.__version__}')" + +# Additional dependencies for image downloading and processing +RUN uv pip install --system \ + loguru \ + Pillow \ + aiohttp \ + tqdm \ + pandas \ + pyarrow \ + huggingface_hub \ + transformers + +# Set environment variable for model directory +ENV MODEL_DIR=/home/ray/model_weights + +# Create output directories +RUN mkdir -p /home/ray/data/webdataset \ + /home/ray/data/results \ + /home/ray/data/embeddings \ + /home/ray/data/removal_ids + diff --git a/nemo_curator_semantic_dedup/README.md b/nemo_curator_semantic_dedup/README.md new file mode 100644 index 0000000..84b872e --- /dev/null +++ b/nemo_curator_semantic_dedup/README.md @@ -0,0 +1,57 @@ +# Image Semantic Deduplication with NeMo Curator + +This example uses [NVIDIA NeMo Curator](https://github.com/NVIDIA-NeMo/Curator) to perform GPU-accelerated semantic deduplication on image datasets. + +NeMo Curator is a scalable data curation library that leverages NVIDIA RAPIDS™ for GPU acceleration. This example downloads images from a parquet file, generates CLIP embeddings, and removes near-duplicate images based on semantic similarity. + +## Install the Anyscale CLI + +```bash +pip install -U anyscale +anyscale login +``` + +## Run the job + +Clone the example from GitHub. + +```bash +git clone https://github.com/anyscale/examples.git +cd examples/nemo_curator_semantic_dedup +``` + +Submit the job. + +```bash +anyscale job submit -f job.yaml +``` + +## Understanding the example + +- The [Dockerfile](./Dockerfile) builds a custom image with NeMo Curator CUDA dependencies (`nemo-curator[image_cuda12]`), downloads the MS COCO sample dataset from HuggingFace, and pre-downloads the CLIP model weights to speed up job startup. + +- The entrypoint defined in [job.yaml](./job.yaml) runs `image_dedup_example.py` which executes a 3-step pipeline: + 1. **Download WebDataset**: Fetches images from URLs in the parquet file and saves them as WebDataset tar files to `/mnt/cluster_storage/nemo_curator/webdataset` + 2. **Generate CLIP embeddings**: Uses OpenAI's CLIP ViT-L/14 model to create 768-dimensional embeddings for each image + 3. **Semantic deduplication**: Clusters embeddings with k-means and removes near-duplicates based on cosine similarity + +- The `/mnt/cluster_storage/` directory is an ephemeral shared filesystem attached to the cluster for the duration of the job. All outputs (embeddings, duplicate IDs, and deduplicated images) are saved here. + +- To use your own data, prepare a parquet file with `URL` and `TEXT` columns, upload it to cluster storage, and override the `INPUT_PARQUET` environment variable: + ```bash + anyscale job submit -f job.yaml \ + --env INPUT_PARQUET=/mnt/cluster_storage/your_data.parquet \ + --env OUTPUT_DIR=/mnt/cluster_storage/your_results + ``` + +- The [helper.py](./helper.py) module provides utilities for downloading images in parallel and converting them to [WebDataset](https://github.com/webdataset/webdataset) format, which is optimized for streaming large-scale image datasets. + +## View the job + +View the job in the [jobs tab](https://console.anyscale.com/jobs) of the Anyscale console. + +## Learn more + +- [NeMo Curator Documentation](https://docs.nvidia.com/nemo/curator/latest/) +- [NeMo Curator Image Tutorials](https://github.com/NVIDIA-NeMo/Curator/tree/main/tutorials/image/getting-started) +- [Anyscale Jobs Documentation](https://docs.anyscale.com/platform/jobs/) diff --git a/nemo_curator_semantic_dedup/helper.py b/nemo_curator_semantic_dedup/helper.py new file mode 100644 index 0000000..c7f1df9 --- /dev/null +++ b/nemo_curator_semantic_dedup/helper.py @@ -0,0 +1,183 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helper functions for downloading and preparing image datasets as WebDataset tar files.""" + +from __future__ import annotations + +import io +import json +import os +import tarfile +import uuid +from concurrent.futures import ThreadPoolExecutor +from typing import Any + +import requests +from PIL import Image + + +def download_single_image(url: str, session: requests.Session) -> bytes | None: + """Download a single image, returning bytes or None on failure.""" + try: + response = session.get(url, timeout=5, stream=True) + return response.content if response.status_code == 200 else None + except Exception: + return None + + +def image_download_batch(batch: dict[str, Any]) -> dict[str, Any]: + """Download a batch of images using ThreadPoolExecutor for parallelism.""" + session = requests.Session() + adapter = requests.adapters.HTTPAdapter( + pool_connections=100, pool_maxsize=100, max_retries=0, + ) + session.mount("http://", adapter) + session.mount("https://", adapter) + + # Use ThreadPoolExecutor for parallel downloads within this batch + # 50 threads means 50 concurrent downloads per Ray task + with ThreadPoolExecutor(max_workers=50) as executor: + batch["bytes"] = list(executor.map(lambda url: download_single_image(url, session), batch["url"])) + + return batch + + +def process_image(row: dict[str, Any]) -> list[dict[str, Any]]: + """Validate downloaded image bytes, convert to JPEG, and drop failures. + + Returns a single-element list on success or an empty list to drop the row. + Designed for use with Ray Data's flat_map. + """ + image_bytes = row.get("bytes") + if not image_bytes: + return [] + + try: + img = Image.open(io.BytesIO(image_bytes)) + img.verify() + img = Image.open(io.BytesIO(image_bytes)) + + # Robust RGB conversion for ALL image modes (L, LA, P, PA, RGBA, CMYK, etc.) + # This ensures CLIP gets 3-channel images + if img.mode != "RGB": + if img.mode == "P": + img = img.convert("RGBA") + # For any mode with alpha, composite onto white background + if img.mode in ("RGBA", "LA", "PA"): + background = Image.new("RGB", img.size, (255, 255, 255)) + # Use alpha channel as mask + if img.mode == "LA": + img = img.convert("RGBA") + background.paste(img, mask=img.split()[-1]) + img = background + else: + img = img.convert("RGB") + + if img.mode != "RGB" or img.size[0] < 3 or img.size[1] < 3: + return [] + + jpeg_buffer = io.BytesIO() + img.save(jpeg_buffer, format="JPEG", quality=95) + row["jpeg_bytes"] = jpeg_buffer.getvalue() + return [row] + except Exception: + return [] + + +def write_tar_batch(batch: dict[str, Any], output_dir: str) -> dict[str, Any]: + """Write a batch of images to a WebDataset tar shard.""" + import ray + + node_id = ray.get_runtime_context().get_node_id()[:8] + shard_id = f"{node_id}_{uuid.uuid4().hex[:8]}" + tar_path = os.path.join(output_dir, f"{shard_id}.tar") + + urls = batch["url"] + captions = batch["caption"] + jpeg_list = batch["jpeg_bytes"] + num_images = len(urls) + + with tarfile.open(tar_path, "w") as tar: + for i in range(num_images): + key = f"{shard_id}_{i:06d}" + + jpg_info = tarfile.TarInfo(name=f"{key}.jpg") + jpg_info.size = len(jpeg_list[i]) + tar.addfile(jpg_info, fileobj=io.BytesIO(jpeg_list[i])) + + caption_bytes = str(captions[i]).encode("utf-8") + txt_info = tarfile.TarInfo(name=f"{key}.txt") + txt_info.size = len(caption_bytes) + tar.addfile(txt_info, fileobj=io.BytesIO(caption_bytes)) + + meta = json.dumps({"url": urls[i], "caption": captions[i], "key": key}).encode("utf-8") + json_info = tarfile.TarInfo(name=f"{key}.json") + json_info.size = len(meta) + tar.addfile(json_info, fileobj=io.BytesIO(meta)) + + return {"shard_id": [shard_id], "success_count": [num_images], "total_count": [num_images]} + + +def parquet_to_webdataset_ray( + hf_dataset_path: str, + output_dir: str, + entries_per_tar: int = 1000, + max_entries: int | None = None, + concurrency: int | None = None, +) -> dict[str, int]: + """Convert HuggingFace parquet dataset to WebDataset tar files using Ray Data.""" + import ray + import ray.data + from functools import partial + from huggingface_hub import HfFileSystem + + os.makedirs(output_dir, exist_ok=True) + + ds = ray.data.read_parquet( + hf_dataset_path, + file_extensions=["parquet"], + columns=["url", "caption"], + filesystem=HfFileSystem(token=os.environ["HF_TOKEN"]), + concurrency=10, + ) + + if max_entries is not None: + ds = ds.limit(max_entries) + ds = ds.repartition(num_blocks=max(100, max_entries // 1000)) + + total_rows = ds.count() + + if concurrency is None: + cluster_resources = ray.cluster_resources() + concurrency = max(4, int(cluster_resources.get("CPU", 4))) + + # Download images, validate, convert to JPEG + ds = ds.map_batches(image_download_batch, batch_size=100, batch_format="numpy") + ds = ds.flat_map(process_image) + + # Write tar shards + results = ds.map_batches( + partial(write_tar_batch, output_dir=output_dir), + batch_size=entries_per_tar, + batch_format="numpy", + concurrency=concurrency, + ).take_all() + + total_success = sum(r["success_count"] for r in results) + num_shards = len(results) + success_rate = (total_success / total_rows * 100) if total_rows > 0 else 0 + print(f"\n✓ Download complete: {total_success} images in {num_shards} shards ({success_rate:.1f}% success rate)") + + return {"total_success": total_success, "total_attempted": total_rows, "num_shards": num_shards} diff --git a/nemo_curator_semantic_dedup/image_dedup_example.py b/nemo_curator_semantic_dedup/image_dedup_example.py new file mode 100644 index 0000000..e7c5874 --- /dev/null +++ b/nemo_curator_semantic_dedup/image_dedup_example.py @@ -0,0 +1,182 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +import os +from pathlib import Path +import time + +from helper import parquet_to_webdataset_ray + +from nemo_curator.backends.experimental.ray_actor_pool import RayActorPoolExecutor +from nemo_curator.backends.experimental.ray_data import RayDataExecutor +from nemo_curator.core.client import RayClient +from nemo_curator.pipeline import Pipeline +from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow +from nemo_curator.stages.file_partitioning import FilePartitioningStage +from nemo_curator.stages.image.deduplication.removal import ImageDuplicatesRemovalStage +from nemo_curator.stages.image.embedders.clip_embedder import ImageEmbeddingStage +from nemo_curator.stages.image.io.convert import ConvertImageBatchToDocumentBatchStage +from nemo_curator.stages.image.io.image_reader import ImageReaderStage +from nemo_curator.stages.image.io.image_writer import ImageWriterStage +from nemo_curator.stages.text.io.writer.parquet import ParquetWriter + + +@dataclass +class Config: + """Configuration loaded from environment variables.""" + input_parquet: str + input_wds_dataset_dir: str + output_dataset_dir: str + embeddings_dir: str + removal_parquets_dir: str + model_dir: str + entries_per_tar: int + max_entries: int | None + tar_files_per_partition: int + batch_size: int + embedding_batch_size: int + + @classmethod + def from_env(cls) -> "Config": + """Load configuration from environment variables.""" + max_entries_str = os.environ.get("MAX_ENTRIES") + + return cls( + input_parquet=os.environ["INPUT_PARQUET"], + input_wds_dataset_dir=os.environ["INPUT_WDS_DIR"], + output_dataset_dir=os.environ["OUTPUT_DIR"], + embeddings_dir=os.environ["EMBEDDINGS_DIR"], + removal_parquets_dir=os.environ["REMOVAL_DIR"], + model_dir=os.environ.get("MODEL_DIR", "/home/ray/model_weights"), + entries_per_tar=int(os.environ.get("ENTRIES_PER_TAR", "1000")), + max_entries=int(max_entries_str) if max_entries_str else None, + tar_files_per_partition=int(os.environ.get("TAR_FILES_PER_PARTITION", "1")), + batch_size=int(os.environ.get("BATCH_SIZE", "100")), + embedding_batch_size=int(os.environ.get("EMBEDDING_BATCH_SIZE", "32")), + ) + + +def create_image_embedding_pipeline(config: Config) -> Pipeline: + """Create pipeline: read images -> generate CLIP embeddings -> save to parquet.""" + pipeline = Pipeline(name="image_embedding") + + pipeline.add_stage(FilePartitioningStage( + file_paths=config.input_wds_dataset_dir, + files_per_partition=config.tar_files_per_partition, + file_extensions=[".tar"], + )) + + pipeline.add_stage(ImageReaderStage(batch_size=config.batch_size, num_gpus_per_worker=0)) + + pipeline.add_stage(ImageEmbeddingStage( + model_dir=config.model_dir, + model_inference_batch_size=config.embedding_batch_size, + remove_image_data=True, + )) + + pipeline.add_stage(ConvertImageBatchToDocumentBatchStage(fields=["image_id", "embedding"])) + + pipeline.add_stage(ParquetWriter(path=config.embeddings_dir)) + + return pipeline + + +def create_embedding_deduplication_workflow(config: Config) -> Pipeline: + """Create semantic deduplication workflow using K-means + DBSCAN.""" + return SemanticDeduplicationWorkflow( + input_path=config.embeddings_dir, + output_path=config.removal_parquets_dir, + id_field="image_id", + embedding_field="embedding", + n_clusters=100, + eps=0.01, + ) + + +def create_image_deduplication_pipeline(config: Config) -> Pipeline: + """Create pipeline: read images -> filter duplicates -> write deduplicated dataset.""" + pipeline = Pipeline(name="image_deduplication") + + pipeline.add_stage(FilePartitioningStage( + file_paths=config.input_wds_dataset_dir, + files_per_partition=config.tar_files_per_partition, + file_extensions=[".tar"], + )) + + pipeline.add_stage(ImageReaderStage(batch_size=config.batch_size, num_gpus_per_worker=0)) + + pipeline.add_stage(ImageDuplicatesRemovalStage( + removal_parquets_dir=config.removal_parquets_dir + "/duplicates", + duplicate_id_field="id", + )) + + pipeline.add_stage(ImageWriterStage( + output_dir=config.output_dataset_dir, + remove_image_data=True, + )) + + return pipeline + + +def main(config: Config) -> None: + """Main execution function for image semantic deduplication pipeline.""" + ray_client = RayClient() + ray_client.start() + + # Step 1: Download images and create WebDataset tar files + os.makedirs(config.input_wds_dataset_dir, exist_ok=True) + stats = parquet_to_webdataset_ray( + hf_dataset_path=config.input_parquet, + output_dir=config.input_wds_dataset_dir, + entries_per_tar=config.entries_per_tar, + max_entries=config.max_entries, + ) + print(stats) + + # Use executors that avoid scheduling on CPU-only head node + streaming_executor = RayDataExecutor(ignore_head_node=True) + actor_executor = RayActorPoolExecutor(ignore_head_node=True) + + # Step 2: Generate CLIP embeddings + pipeline = create_image_embedding_pipeline(config) + pipeline.run(executor=streaming_executor) + + # Step 3: Find semantic duplicates using K-means + DBSCAN + workflow = create_embedding_deduplication_workflow(config) + workflow.run(kmeans_executor=actor_executor, pairwise_executor=actor_executor) + + # Step 4: Write deduplicated dataset + pipeline = create_image_deduplication_pipeline(config) + pipeline.run(executor=streaming_executor) + + ray_client.stop() + + +def _load_env_file() -> None: + """Load variables from .env file if present, without overriding existing env vars.""" + env_file = Path(__file__).parent / ".env" + if not env_file.exists(): + return + for line in env_file.read_text().splitlines(): + line = line.strip() + if line and not line.startswith("#") and "=" in line: + key, value = line.split("=", 1) + os.environ.setdefault(key.strip(), value.strip()) + + +if __name__ == "__main__": + _load_env_file() + config = Config.from_env() + main(config) \ No newline at end of file diff --git a/nemo_curator_semantic_dedup/job.yaml b/nemo_curator_semantic_dedup/job.yaml new file mode 100644 index 0000000..d1987b3 --- /dev/null +++ b/nemo_curator_semantic_dedup/job.yaml @@ -0,0 +1,90 @@ +# NeMo Curator Image Semantic Deduplication Job +# View the docs: https://docs.anyscale.com/reference/job-api#jobconfig +# +# This job runs a two-phase pipeline: +# Phase 1: Convert parquet (URLs) → WebDataset tar files (using Ray Data, distributed) +# Phase 2: Run NeMo Curator image deduplication (CLIP embeddings → semantic dedup) +# +# The parquet → tar conversion uses Ray Data to distribute image downloads +# across all nodes in the cluster, providing much better scalability than +# single-node processing. + +name: nemo-curator-image-dedup + +# Build custom image with NeMo Curator CUDA dependencies +containerfile: ./Dockerfile + +# Compute configuration with L4 GPU for CUDA-accelerated image processing +# CPU-only head node + GPU worker nodes (using ignore_head_node=True in executors) +compute_config: + head_node: + instance_type: m6i.2xlarge # CPU-only, 8 vCPUs, 32GB RAM + # No tasks scheduled here - using RayDataExecutor/RayActorPoolExecutor with ignore_head_node=True + resources: + CPU: 0 # Prevent any task scheduling on head node + worker_nodes: + - instance_type: g5.12xlarge # 4x A10G GPUs per worker, 48 vCPUs, 192GB RAM + min_nodes: 2 + max_nodes: 2 + +# Working directory - use the repo root (absolute) so Curator/ is included +working_dir: /home/ray/default + +# Environment variables for job configuration +# Override these when submitting to use your own data paths +env_vars: + # Input parquet files with image URLs (url and caption columns) + # Read directly from HuggingFace + INPUT_PARQUET: "hf://datasets/laion/relaion400m/" + MAX_ENTRIES: "1000" + + # HuggingFace token for gated datasets + # Replace with your token before submitting (GitHub push protection will block commits with real tokens) + HF_TOKEN: "" + + # Directory for WebDataset tar files (created from parquet) + # Use /mnt/cluster_storage for persistence, or /home/ray/data for ephemeral + INPUT_WDS_DIR: "/mnt/cluster_storage/nemo_curator/webdataset" + + # Output directory for deduplicated images + OUTPUT_DIR: "/mnt/cluster_storage/nemo_curator/results" + + # Directory to store CLIP embeddings + EMBEDDINGS_DIR: "/mnt/cluster_storage/nemo_curator/embeddings" + + # Directory for duplicate removal parquets + REMOVAL_DIR: "/mnt/cluster_storage/nemo_curator/removal_ids" + + # Model weights directory (pre-downloaded in Docker image) + MODEL_DIR: "/home/ray/model_weights" + + # Processing settings + BATCH_SIZE: "64" + EMBEDDING_BATCH_SIZE: "64" + TAR_FILES_PER_PARTITION: "1" + ENTRIES_PER_TAR: "500" + + # Ray Data settings for parquet -> tar conversion + # Uses ray.data.expressions.download() for distributed downloads + # DOWNLOAD_CONCURRENCY: "" # Auto-detected from cluster resources if not set + + + # Don't hide GPUs from tasks that request num_gpus=0 (needed for DALI) + RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO: "0" + + # Disable Python output buffering for real-time logs + PYTHONUNBUFFERED: "1" + + # Use local Curator/ from working_dir on all nodes (overrides Docker image's installed copy) + PYTHONPATH: "Curator" + +# The entrypoint script (-u for unbuffered output) +entrypoint: python -u examples/nemo_curator_semantic_dedup/image_dedup_example.py + +# Don't retry on failure - easier to debug +max_retries: 0 + +# Kill after 48 hours for full dataset (adjust based on dataset size) +# Full relaion400m (~361M images) will take many hours +timeout_s: 172800 + diff --git a/serve_tensor_parallel/Dockerfile b/serve_tensor_parallel/Dockerfile new file mode 100644 index 0000000..8983032 --- /dev/null +++ b/serve_tensor_parallel/Dockerfile @@ -0,0 +1,5 @@ +FROM anyscale/ray:2.49.2-py312-cu128 + +RUN curl -LsSf https://astral.sh/uv/install.sh | sh + +RUN uv pip install --system transformers deepspeed diff --git a/serve_tensor_parallel/README.md b/serve_tensor_parallel/README.md new file mode 100644 index 0000000..cbda96b --- /dev/null +++ b/serve_tensor_parallel/README.md @@ -0,0 +1,58 @@ +# Serving a Model with Tensor Parallelism + +This example explores a slightly more complex serving use case in which a model is deployed with various degrees of tensor parallelism (meaning the individual tensors are sharded across multiple GPUs). This example uses Ray Serve along with DeepSpeed and Hugging Face Transformers to deploy GPT-2 across a couple GPUs as an Anyscale service. + +## Install the Anyscale CLI + +```bash +pip install -U anyscale +anyscale login +``` + +## Deploy the service + +Clone the example from GitHub. + +```bash +git clone https://github.com/anyscale/examples.git +cd examples/serve_tensor_parallel +``` + +Deploy the service. + +```bash +anyscale service deploy -f service.yaml +``` + +## Understanding the example + +- Each replica of the model is sharded across a number of `InferenceWorker` Ray actors. There are `tensor_parallel_size` (2 by default) of them per model replica. There is an additional coordinator actor called `InferenceDeployment`, which instantiates the `InferenceWorker` actors and queries them. +- For each model replica, the `InferenceWorker` actors use DeepSpeed to communicate and perform inference. +- Ray uses a [placement group](https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html) to reserve colocated resources for all of the actors for a given model. In the case of larger models that span multiple nodes, it is also possible to use placement groups to reserve resources across multiple nodes. + +## Query the service + +The `anyscale service deploy` command outputs a line that looks like +```text +curl -H "Authorization: Bearer " +``` + +From the output, you can extract the service token and base URL. Open [query.py](https://github.com/anyscale/examples/blob/main/03_deploy_llama_3_8b/query.py) and add them to the appropriate fields. +```python +token = +base_url = +``` + +Query the model +```bash +python query.py +``` + +View the service in the [services tab](https://console.anyscale.com/services) of the Anyscale console. + +## Shutdown + +Shutdown your Anyscale Service: +```bash +anyscale service terminate -n tp-service +``` diff --git a/serve_tensor_parallel/main.py b/serve_tensor_parallel/main.py new file mode 100644 index 0000000..d598a12 --- /dev/null +++ b/serve_tensor_parallel/main.py @@ -0,0 +1,73 @@ +import os +import random +from fastapi import FastAPI +import ray +from ray import serve +import torch +import torch.distributed as dist +import deepspeed +from transformers import AutoModelForCausalLM, AutoTokenizer + +tensor_parallelism_size = 2 +model_name = "gpt2" # 124M params - works on small GPUs + +# Define a FastAPI app and wrap it in a deployment with a route handler. +app = FastAPI() + + +# Using max_restarts=3 because occasionally there will be a port conflict (we are choosing a random port) +# and the nccl process group will fail to initialize. We want to retry in those cases. +@ray.remote(num_gpus=1, max_restarts=3) +class InferenceWorker: + def __init__(self, rank, tensor_parallelism_size, master_address, master_port): + self.rank = rank + os.environ["MASTER_ADDR"] = master_address + os.environ["MASTER_PORT"] = master_port + os.environ["RANK"] = str(self.rank) + os.environ["WORLD_SIZE"] = str(tensor_parallelism_size) + dist.init_process_group("nccl", rank=self.rank, world_size=tensor_parallelism_size) + + model = AutoModelForCausalLM.from_pretrained(model_name) + self.tokenizer = AutoTokenizer.from_pretrained(model_name) + # self.tokenizer.pad_token = self.tokenizer.eos_token + # Initialize DeepSpeed inference with tensor parallelism + self.model = deepspeed.init_inference( + model, + tensor_parallel={"tp_size": tensor_parallelism_size}, # Use all available GPUs + dtype=torch.float16, + replace_with_kernel_inject=True + ) + + def inference(self, text): + inputs = self.tokenizer(text, return_tensors="pt") + inputs = {k: v.cuda() for k, v in inputs.items()} + with torch.no_grad(): + outputs = self.model.generate(**inputs, max_length=100) + if self.rank == 0: + return self.tokenizer.decode(outputs[0]) + + +@serve.deployment( + num_replicas=2, + ray_actor_options={"num_cpus": 1}, + placement_group_bundles=[{ + "CPU": tensor_parallelism_size + 1, # One additional CPU for the coordinator actor. + "GPU": tensor_parallelism_size + }], +) +@serve.ingress(app) +class InferenceDeployment: + def __init__(self, tensor_parallelism_size): + master_address = "localhost" # This is fine as long as the model fits on a single node. + master_port = str(random.randint(10000, 65535)) + self.workers = [InferenceWorker.remote(i, tensor_parallelism_size, master_address, master_port) for i in range(tensor_parallelism_size)] + + # FastAPI will automatically parse the HTTP request for us. + @app.get("/infer") + def inference(self, text: str) -> str: + results = ray.get([worker.inference.remote(text) for worker in self.workers]) + return results[0] + + +# Create deployment. +app = InferenceDeployment.bind(tensor_parallelism_size) diff --git a/serve_tensor_parallel/query.py b/serve_tensor_parallel/query.py new file mode 100644 index 0000000..36b9a9a --- /dev/null +++ b/serve_tensor_parallel/query.py @@ -0,0 +1,17 @@ +from urllib.parse import urljoin +import requests + +# The "anyscale service deploy" script outputs a line that looks like +# +# curl -H "Authorization: Bearer " +# +# From this, you can parse out the service token and base URL. +token = # Fill this in. If deploying and querying locally, use token = "FAKE_KEY" +base_url = # Fill this in. If deploying and querying locally, use base_url = "http://localhost:8000" + +resp = requests.get( + urljoin(base_url, "infer"), + params={"text": "What is the future of AI? "}, + headers={"Authorization": f"Bearer {token}"}) + +print(resp.text) diff --git a/serve_tensor_parallel/service.yaml b/serve_tensor_parallel/service.yaml new file mode 100644 index 0000000..0bbc31b --- /dev/null +++ b/serve_tensor_parallel/service.yaml @@ -0,0 +1,48 @@ +# View the docs https://docs.anyscale.com/reference/service-api#serviceconfig. + +name: tp-service + +# Specify an image to be built in the containerfile. +containerfile: ./Dockerfile +# An alternative to containerfile is to provide an image_uri. When empty, use the +# default image. This can be an Anyscale-provided base image# like +# anyscale/ray:2.43.0-slim-py312-cu125, a user-provided base image (provided +# that it meets certain specs), or you can build new images using the Anyscale +# image builder at https://console.anyscale-staging.com/v2/container-images. +# image_uri: anyscale/ray:2.49.2-py312-cu128 + +# When empty, Anyscale will auto-select the instance types. You can also specify +# minimum and maximum resources. +compute_config: +# head_node: +# instance_type: m5.2xlarge +# worker_nodes: +# - instance_type: m5.16xlarge +# min_nodes: 0 +# max_nodes: 100 +# - instance_type: m7a.24xlarge +# min_nodes: 0 +# max_nodes: 100 +# market_type: PREFER_SPOT # Defaults to ON_DEMAND +# - instance_type: g4dn.2xlarge +# min_nodes: 0 +# max_nodes: 100 +# market_type: PREFER_SPOT # Defaults to ON_DEMAND +# min_resources: +# CPU: 100 +# GPU: 1 +# max_resources: +# CPU: 5000 +# GPU: 100 + +# Path to a local directory or a remote URI to a .zip file (S3, GS, HTTP) that +# will be the working directory for the job. The files in the directory will be +# automatically uploaded to the job environment in Anyscale. +working_dir: . + +# When empty, this uses the default Anyscale Cloud in your organization. +cloud: + +# Speciy the Ray Serve app to deploy. +applications: +- import_path: main:app diff --git a/skyrl/Dockerfile b/skyrl/Dockerfile new file mode 100644 index 0000000..0cd352e --- /dev/null +++ b/skyrl/Dockerfile @@ -0,0 +1,9 @@ +FROM anyscale/ray:2.48.0-slim-py312-cu128 + +RUN sudo apt-get update -y \ + && sudo apt-get install --no-install-recommends -y build-essential libnuma-dev \ + && sudo rm -f /etc/apt/sources.list.d/* + +RUN curl -LsSf https://astral.sh/uv/install.sh | sh + +RUN git clone https://github.com/novasky-ai/SkyRL.git diff --git a/skyrl/README.md b/skyrl/README.md new file mode 100644 index 0000000..7330f7e --- /dev/null +++ b/skyrl/README.md @@ -0,0 +1,39 @@ +# GRPO with SkyRL + +This example uses [SkyRL](https://github.com/NovaSky-AI/SkyRL) to run GRPO training on the GSM8K dataset. + +SkyRL is a modular and extensible reinforcement learning library for training large language models. It supports RL algorithms like PPO, GRPO, and DAPO, tool-use tasks, and multi-turn agentic workflows. + +## Install the Anyscale CLI + +```bash +pip install -U anyscale +anyscale login +``` + +## Deploy the service + +Clone the example from GitHub. + +```bash +git clone https://github.com/anyscale/examples.git +cd examples/skyrl +``` + +Deploy the service. + +```bash +anyscale job submit -f job.yaml +``` + +## Understanding the example + +- The entrypoint defined in the [job.yaml](https://github.com/anyscale/examples/blob/main/skyrl/job.yaml) first runs a script to download the GSM8K dataset and store it under `/mnt/cluster_storage/data/gsm8k`. The `/mnt/cluster_storage/` directory is an ephemeral shared filesystem attached to the cluster for the duration of the job (this ensures that all workers have access to the data). +- The main entrypoint, `skyrl_train.entrypoints.main_base`, is run using `uv`, which picks up the relevant [pyproject.toml](https://github.com/NovaSky-AI/SkyRL/blob/main/skyrl-train/pyproject.toml) file in the SkyRL repository. That file specifies a Ray version, but we actually want to use the version of Ray used in the existing Ray cluster on Anyscale, which is why the `uv run` command includes the flag `--with ray@http://localhost:9478/ray/ray-2.48.0-cp312-cp312-manylinux2014_x86_64.whl`. +- In this example, we cannot set the `working_dir` argument in the job yaml file because `uv` will look for the appropriate `pyproject.toml` file in that working directory (and won't find it) instead of in the correct directory `$HOME/SkyRL/skyrl-train`. +- To store checkpoints in a persistent location, you can pass `ckpt_path` into the entrypoint. Read more about [Anyscale storage options](https://docs.anyscale.com/configuration/storage). This examples saves checkpoints to a mounted shared filesystem via `ckpt_path=/mnt/shared_storage/skyrl_checkpoints`. If you would like to save checkpoints to blob storage, you could set `ckpt_path=$ANYSCALE_ARTIFACT_STORAGE/skyrl_checkpoints`. On AWS you will also need to modify the main entrypoint to include `--with s3fs` in the `uv run` command, and you'll need `--with gcsfs` on GCP. + + +## View the job + +View the job in the [jobs tab](https://console.anyscale.com/jobs) of the Anyscale console. diff --git a/skyrl/job.yaml b/skyrl/job.yaml new file mode 100644 index 0000000..6d06dd2 --- /dev/null +++ b/skyrl/job.yaml @@ -0,0 +1,76 @@ +# View the docs https://docs.anyscale.com/reference/job-api#jobconfig. + +name: skyrl-train + +# When empty, use the default image. This can be an Anyscale-provided base image +# like anyscale/ray:2.50.0-slim-py312-cu128, a user-provided base image (provided +# that it meets certain specs), or you can build new images using the Anyscale +# image builder at https://console.anyscale-staging.com/v2/container-images. +# image_uri: anyscale/ray:2.50.0-slim-py312-cu128 +containerfile: ./Dockerfile + +# When empty, Anyscale will auto-select the instance types. You can also specify +# minimum and maximum resources. +compute_config: +# head_node: +# instance_type: m5.2xlarge + worker_nodes: + # These instances are available in AWS. + - instance_type: g6.12xlarge + min_nodes: 1 + max_nodes: 1 + + # # These instances are available in GCP. + # - instance_type: g2-standard-48-nvidia-l4-4 + # min_nodes: 1 + # max_nodes: 1 + +# Path to a local directory or a remote URI to a .zip file (S3, GS, HTTP) that +# will be the working directory for the job. The files in the directory will be +# automatically uploaded to the job environment in Anyscale. This is commented out +# here because if it present, uv will look for the pyproject.toml file in that +# working directory and won't find it (instead of in the correct directory +# $HOME/SkyRL/skyrl-train). +# working_dir: . + +# When empty, this uses the default Anyscale Cloud in your organization. +cloud: + +env_vars: + DATA_DIR: "/mnt/cluster_storage/data/gsm8k" + # SkyRL has some aggressive timeouts, so we need to increase them. + SKYRL_RAY_PG_TIMEOUT_IN_S: "600" + # When using uv, Ray workers take a while to start up. + RAY_worker_register_timeout_seconds: "600" + +# Fetch the data and run the training. +entrypoint: | + cd $HOME/SkyRL/skyrl-train && \ + uv run --isolated examples/gsm8k/gsm8k_dataset.py --output_dir $DATA_DIR && \ + uv run --isolated \ + --extra vllm \ + --with ray@http://localhost:9478/ray/ray-2.48.0-cp312-cp312-manylinux2014_x86_64.whl \ + -m skyrl_train.entrypoints.main_base \ + data.train_data="['$DATA_DIR/train.parquet']" \ + data.val_data="['$DATA_DIR/validation.parquet']" \ + trainer.algorithm.advantage_estimator="grpo" \ + trainer.policy.model.path="Qwen/Qwen2.5-1.5B-Instruct" \ + trainer.strategy=fsdp2 \ + trainer.placement.colocate_all=true \ + trainer.placement.policy_num_gpus_per_node=4 \ + trainer.eval_batch_size=1024 \ + trainer.eval_before_train=true \ + trainer.eval_interval=5 \ + trainer.ckpt_interval=10 \ + generator.backend=vllm \ + generator.num_inference_engines=4 \ + generator.inference_engine_tensor_parallel_size=1 \ + generator.weight_sync_backend=nccl \ + environment.env_class=gsm8k \ + trainer.logger="console" \ + trainer.project_name="gsm8k" \ + trainer.run_name="gsm8k_test" \ + trainer.ckpt_path=$ANYSCALE_ARTIFACT_STORAGE/skyrl_checkpoints + +# If there is an error, do not retry. +max_retries: 0 diff --git a/video_generation_with_fastvideo/Dockerfile b/video_generation_with_fastvideo/Dockerfile new file mode 100644 index 0000000..6d57b0c --- /dev/null +++ b/video_generation_with_fastvideo/Dockerfile @@ -0,0 +1,9 @@ +FROM anyscale/ray:2.48.0-py312-cu128 + +RUN pip install fastvideo imageio +# Currently there is an incompatibility with the latest version of gradio. +RUN pip install gradio==3.50.2 +# Flash attention is not strictly required, but improves performance. +# Flash attention requires nvcc, which is contained in anyscale/ray:2.48.0-py312-cu128 +# but not in anyscale/ray:2.48.0-slim-py312-cu128. +RUN pip install flash-attn --no-build-isolation diff --git a/video_generation_with_fastvideo/README.md b/video_generation_with_fastvideo/README.md new file mode 100644 index 0000000..c7b2f0d --- /dev/null +++ b/video_generation_with_fastvideo/README.md @@ -0,0 +1,98 @@ +# Generate videos with FastVideo + +This example demonstrates how to deploy the state-of-the-art video generation model as an Anyscale service using Fast Video. + +## Install the Anyscale CLI + +``` +pip install -U anyscale +anyscale login +``` + +## Deploy the service + +Clone the example from GitHub. + +``` +git clone https://github.com/anyscale/examples.git +cd examples/video_generation_with_fastvideo +``` + +Deploy the service. + +``` +anyscale service deploy -f service.yaml +``` + +## Query the service + +The `anyscale service deploy` command outputs a line that looks like + +```text +curl -H "Authorization: Bearer " +``` + +Navigate to the service in the [services tab](https://console.anyscale.com/services) of the Anyscale console to watch the progress of the service deployment. + +Once the service is deployed, you can view the Gradio UI by pasting the appropriate "base URL" into your browser. + +From there, you can generate videos by tweaking the prompt and the number of inference steps. + +By default, this example uses L4 GPUs and so generation is quite slow (3 inference steps can take around 90 seconds). On an H100, a 5 second video can be generated in around 5 seconds. + +## Understanding the example + +The first Ray Serve deployment is `GenerateVideo`, which instantiates the video generation model using FastVideo and runs inference. +- The `@serve.deployment` decorator specifies the accelerator type and the amount of CPU memory required. Without the memory requirement, Anyscale may provision an instance that is too small, and FastVideo will run out of memory. +- Switch to an H100 to generate a high quality video in a reasonable period of time. + +```python +@serve.deployment(num_replicas=1, ray_actor_options={"num_gpus": 1, "memory": 50 * 10**9, "accelerator_type": "L4"}) +class GenerateVideo: + def __init__(self): + # Create a video generator with a pre-trained model + self.generator = VideoGenerator.from_pretrained( + "Wan-AI/Wan2.1-T2V-1.3B-Diffusers", + num_gpus=1, + ) + + def generate(self, prompt: str, num_inference_steps: int = 3) -> bytes: + # Generate the video. + video = self.generator.generate_video( + prompt, + num_inference_steps=num_inference_steps, + return_frames=True, + ) + + buffer = io.BytesIO() + imageio.mimsave(buffer, video, fps=16, format="mp4") + buffer.seek(0) + video_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") + + return video_base64 + + async def __call__(self, http_request: Request) -> bytes: + data = await http_request.json() + return self.generate(data["prompt"], data["num_inference_steps"]) +``` + +Next, `GradioServer` wraps a Gradio UI in a Ray Serve deployment. It takes in `generator`, which is a handle to the Ray Serve deployment. The logic for actually building the UI is hidden in `gradio_builder`. + +```python +@serve.deployment +class GradioServer(ASGIAppReplicaWrapper): + """User-facing class that wraps a Gradio App in a Serve Deployment.""" + + def __init__(self, generator: serve.handle.DeploymentHandle): + self.generator = generator + ui = gradio_builder(generator) + super().__init__(gr.routes.App.create_app(ui)) +``` + +The two deployments are combined together to produce the overall application in the line + +```python +app = GradioServer.bind(GenerateVideo.bind()) +``` + +which passes a handle to the `GenerateVideo` deployment into the `GradioServer` deployment. diff --git a/video_generation_with_fastvideo/serve_fastvideo.py b/video_generation_with_fastvideo/serve_fastvideo.py new file mode 100644 index 0000000..c3cd997 --- /dev/null +++ b/video_generation_with_fastvideo/serve_fastvideo.py @@ -0,0 +1,121 @@ +import asyncio +from starlette.requests import Request +from ray import serve +from ray.serve._private.http_util import ASGIAppReplicaWrapper +from fastvideo import VideoGenerator +import base64 +import io +import imageio +import uuid +import os +import gradio as gr + +example_prompt = "A curious raccoon peers through a vibrant field of yellow sunflowers, its eyes wide with interest." + +output_dir = "gradio_videos" +os.makedirs(output_dir, exist_ok=True) + + +def gradio_builder(generator: serve.handle.DeploymentHandle): + def query_model(prompt, num_inference_steps): + + async def run_query_model(prompt, num_inference_steps): + video_base64 = await generator.generate.remote(prompt, num_inference_steps) + return video_base64 + + video_base64 = asyncio.run(run_query_model(prompt, num_inference_steps)) + video_bytes = base64.b64decode(video_base64) + video_filename = f"{uuid.uuid4()}.mp4" + video_path = os.path.join(output_dir, video_filename) + + with open(video_path, "wb") as f: + f.write(video_bytes) + + return video_path + + with gr.Blocks() as ui: + prompt = gr.Text( + label="Prompt", + value=example_prompt, + show_label=False, + max_lines=3, + placeholder="Describe your scene...", + container=False, + lines=3, + autofocus=True, + ) + + with gr.Row(): + num_inference_steps = gr.Number( + label="Number of Inference Steps", + value=3, + minimum=1, + maximum=50, + step=1, + precision=0, + info="20-30 inference steps may be required for good quality video. On an L4 GPU, 3 inference step could take 90 seconds. To speed up generation, use a more powerful GPU type." + ) + + run_button = gr.Button("Run", variant="primary", size="lg") + result = gr.Video( + label="Generated Video", + show_label=True, + height=466, + width=600, + container=True, + elem_classes="video-component") + + run_button.click( + fn=query_model, + inputs=[prompt, num_inference_steps], + outputs=[result], + ) + + return ui + + +@serve.deployment +class GradioServer(ASGIAppReplicaWrapper): + """User-facing class that wraps a Gradio App in a Serve Deployment.""" + + def __init__(self, generator: serve.handle.DeploymentHandle): + self.generator = generator + ui = gradio_builder(generator) + super().__init__(gr.routes.App.create_app(ui)) + + +@serve.deployment(num_replicas=1, ray_actor_options={"num_gpus": 1, "memory": 50 * 10**9, "accelerator_type": "L4"}) +class GenerateVideo: + def __init__(self): + # Create a video generator with a pre-trained model + self.generator = VideoGenerator.from_pretrained( + "Wan-AI/Wan2.1-T2V-1.3B-Diffusers", + num_gpus=1, # Adjust based on your hardware + ) + + def generate(self, prompt: str, num_inference_steps: int = 3) -> bytes: + # Generate the video. + video = self.generator.generate_video( + prompt, + num_inference_steps=num_inference_steps, + return_frames=True, # Also return frames from this call (defaults to False) + # output_path="my_videos/", # Controls where videos are saved + # save_video=True + ) + + buffer = io.BytesIO() + imageio.mimsave(buffer, video, fps=16, format="mp4") + buffer.seek(0) + video_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") + + return video_base64 + + async def __call__(self, http_request: Request) -> bytes: + data = await http_request.json() + prompt = data["prompt"] + num_inference_steps = data["num_inference_steps"] + print(f"Prompt: {prompt}, Inference steps: {num_inference_steps}") + return self.generate(prompt, num_inference_steps) + + +app = GradioServer.bind(GenerateVideo.bind()) diff --git a/video_generation_with_fastvideo/service.yaml b/video_generation_with_fastvideo/service.yaml new file mode 100644 index 0000000..380c657 --- /dev/null +++ b/video_generation_with_fastvideo/service.yaml @@ -0,0 +1,44 @@ +# View the docs https://docs.anyscale.com/reference/service-api#serviceconfig. + +name: deploy-fast-video + +# When empty, use the default image. This can be an Anyscale-provided base image +# like anyscale/ray:2.43.0-slim-py312-cu125, a user-provided base image (provided +# that it meets certain specs), or you can build new images using the Anyscale +# image builder at https://console.anyscale-staging.com/v2/container-images. + +containerfile: ./Dockerfile + +# When empty, Anyscale will auto-select the instance types. You can also specify +# minimum and maximum resources. +compute_config: +# head_node: +# instance_type: m5.2xlarge +# worker_nodes: +# - instance_type: m5.16xlarge +# min_nodes: 0 +# max_nodes: 100 +# - instance_type: m7a.24xlarge +# min_nodes: 0 +# max_nodes: 100 +# market_type: PREFER_SPOT # Defaults to ON_DEMAND +# - instance_type: g4dn.2xlarge +# min_nodes: 0 +# max_nodes: 100 +# market_type: PREFER_SPOT # Defaults to ON_DEMAND + auto_select_worker_config: true + +# Disable the bearer token so that the gradio UI can be accessed without authentication. +query_auth_token_enabled: false + +# Path to a local directory or a remote URI to a .zip file (S3, GS, HTTP) that +# will be the working directory for the job. The files in the directory will be +# automatically uploaded to the job environment in Anyscale. +working_dir: . + +# When empty, this uses the default Anyscale Cloud in your organization. +cloud: + +# Specify the Ray Serve app to deploy. +applications: +- import_path: serve_fastvideo:app