Skip to content

Commit 01404fa

Browse files
committed
[cursor] One-shot into pipelines plugin system
1 parent c582e66 commit 01404fa

File tree

14 files changed

+1020
-27
lines changed

14 files changed

+1020
-27
lines changed

docs/external-pipelines.md

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
# External Pipeline Development Guide
2+
3+
This guide explains how to create AI Runner pipelines that live in separate repositories and can be installed as dependencies.
4+
5+
## Overview
6+
7+
AI Runner supports external pipelines through Python entry points. This allows pipeline developers to:
8+
9+
1. **Maintain separate repositories** - Each pipeline can have its own repo
10+
2. **Use ai-runner-base as a library** - Install `ai-runner-base` as a dependency
11+
3. **Automatic discovery** - Pipelines are discovered via entry points, no code changes needed in ai-runner
12+
4. **Multi-processing support** - Works seamlessly with the multiprocessing architecture
13+
14+
## Architecture
15+
16+
### Multi-Processing Context
17+
18+
AI Runner uses `multiprocessing` with `spawn` context for GPU memory isolation. This means:
19+
20+
- Pipelines run in a **separate spawned process**
21+
- The pipeline code must be **importable** from installed packages
22+
- Entry points are discovered **in the spawned process**, so they must be installed packages
23+
24+
### Entry Points
25+
26+
Pipelines register themselves via one entry point group:
27+
28+
- **`ai_runner.pipelines`** - Pipeline class (which has a `Params` class attribute linking to its params class)
29+
30+
## Creating an External Pipeline
31+
32+
### Step 1: Project Structure
33+
34+
Create a new Python package with this structure:
35+
36+
```
37+
my-pipeline/
38+
├── pyproject.toml
39+
├── README.md
40+
├── src/
41+
│ └── my_pipeline/
42+
│ ├── __init__.py
43+
│ ├── pipeline.py
44+
│ └── params.py
45+
└── tests/
46+
```
47+
48+
### Step 2: Install ai-runner-base
49+
50+
In your `pyproject.toml`:
51+
52+
```toml
53+
[project]
54+
name = "my-pipeline"
55+
version = "0.1.0"
56+
requires-python = ">=3.10"
57+
dependencies = [
58+
"ai-runner-base>=0.1.0",
59+
# Your pipeline-specific dependencies
60+
"torch>=2.0.0",
61+
# ... other deps
62+
]
63+
64+
[project.entry-points."ai_runner.pipelines"]
65+
my-pipeline = "my_pipeline.pipeline:MyPipeline"
66+
67+
[build-system]
68+
requires = ["setuptools>=61.0", "wheel"]
69+
build-backend = "setuptools.build_meta"
70+
```
71+
72+
### Step 3: Implement Pipeline Interface
73+
74+
Create `src/my_pipeline/pipeline.py`:
75+
76+
```python
77+
from app.live.pipelines.interface import Pipeline
78+
from app.live.pipelines.trickle import VideoFrame, VideoOutput
79+
from .params import MyPipelineParams
80+
import asyncio
81+
import logging
82+
83+
class MyPipeline(Pipeline):
84+
# Link params class to pipeline
85+
Params = MyPipelineParams
86+
87+
def __init__(self):
88+
super().__init__()
89+
self.frame_queue: asyncio.Queue[VideoOutput] = asyncio.Queue()
90+
self.initialized = False
91+
92+
async def initialize(self, **params):
93+
"""Initialize the pipeline with parameters."""
94+
logging.info(f"Initializing MyPipeline with params: {params}")
95+
# Your initialization logic here
96+
self.initialized = True
97+
98+
async def put_video_frame(self, frame: VideoFrame, request_id: str):
99+
"""Process an input frame."""
100+
# Your processing logic here
101+
# For example, a simple pass-through:
102+
output = VideoOutput(frame, request_id)
103+
await self.frame_queue.put(output)
104+
105+
async def get_processed_video_frame(self) -> VideoOutput:
106+
"""Get the next processed frame."""
107+
return await self.frame_queue.get()
108+
109+
async def update_params(self, **params):
110+
"""Update pipeline parameters."""
111+
logging.info(f"Updating params: {params}")
112+
# Return a Task if reload is needed, None otherwise
113+
return None
114+
115+
async def stop(self):
116+
"""Clean up resources."""
117+
self.frame_queue = asyncio.Queue()
118+
self.initialized = False
119+
120+
@classmethod
121+
def prepare_models(cls):
122+
"""Download/prepare models if needed."""
123+
logging.info("Preparing MyPipeline models")
124+
# Your model preparation logic
125+
```
126+
127+
### Step 4: Implement Parameters
128+
129+
Create `src/my_pipeline/params.py`:
130+
131+
```python
132+
from app.live.pipelines.interface import BaseParams
133+
from pydantic import Field
134+
135+
class MyPipelineParams(BaseParams):
136+
"""Parameters for MyPipeline."""
137+
138+
# Add your custom parameters
139+
strength: float = Field(
140+
default=0.8,
141+
ge=0.0,
142+
le=1.0,
143+
description="Processing strength"
144+
)
145+
146+
# BaseParams already provides: width, height, show_reloading_frame
147+
```
148+
149+
### Step 5: Register Entry Points
150+
151+
The entry points are registered in `pyproject.toml`:
152+
153+
```toml
154+
[project.entry-points."ai_runner.pipelines"]
155+
my-pipeline = "my_pipeline.pipeline:MyPipeline"
156+
```
157+
158+
The params class is linked via the `Params` class attribute:
159+
160+
```python
161+
class MyPipeline(Pipeline):
162+
Params = MyPipelineParams
163+
```
164+
165+
**Important**:
166+
- The entry point name (e.g., `my-pipeline`) is what will be used as the pipeline name when starting the runner.
167+
- Only one entry point is needed - the params class is linked via `Pipeline.Params` class attribute.
168+
169+
### Step 6: Install and Use
170+
171+
Install your pipeline:
172+
173+
```bash
174+
# Development install
175+
pip install -e /path/to/my-pipeline
176+
177+
# Or from a git repo
178+
pip install git+https://github.com/yourorg/my-pipeline.git
179+
180+
# Or publish to PyPI
181+
pip install my-pipeline
182+
```
183+
184+
Then use it:
185+
186+
```bash
187+
PIPELINE=my-pipeline MODEL_ID=my-pipeline python -m app.main
188+
```
189+
190+
## Docker Images (Optional)
191+
192+
### Option 1: Use Base Image + Install Pipeline
193+
194+
Create a Dockerfile that extends the ai-runner base image:
195+
196+
```dockerfile
197+
FROM livepeer/ai-runner:live-base
198+
199+
# Install your pipeline
200+
RUN pip install my-pipeline
201+
202+
# Or install from git
203+
RUN pip install git+https://github.com/yourorg/my-pipeline.git
204+
205+
# Or install from local source
206+
COPY my-pipeline/ /tmp/my-pipeline/
207+
RUN pip install /tmp/my-pipeline/
208+
```
209+
210+
### Option 2: Install via uv (Recommended)
211+
212+
If using `uv` for faster installs:
213+
214+
```dockerfile
215+
FROM livepeer/ai-runner:live-base
216+
217+
# Install uv if not already present
218+
RUN pip install uv
219+
220+
# Install your pipeline
221+
RUN uv pip install my-pipeline
222+
223+
# Or from git
224+
RUN uv pip install git+https://github.com/yourorg/my-pipeline.git
225+
```
226+
227+
### Option 3: Pure Python Dependencies (Best)
228+
229+
If your pipeline only needs Python dependencies (no system libraries), you can install it at runtime:
230+
231+
```dockerfile
232+
FROM livepeer/ai-runner:live-base
233+
234+
# Set environment variable to auto-install pipeline
235+
ENV AUTO_INSTALL_PIPELINE="my-pipeline"
236+
237+
# Or use a startup script that installs the pipeline
238+
COPY install-pipeline.sh /usr/local/bin/
239+
RUN chmod +x /usr/local/bin/install-pipeline.sh
240+
```
241+
242+
Then modify the runner startup to check for `AUTO_INSTALL_PIPELINE` and install it automatically.
243+
244+
## Multi-Processing Considerations
245+
246+
Since pipelines run in a spawned subprocess:
247+
248+
1. **All dependencies must be installed** - The spawned process needs access to your pipeline package
249+
2. **Entry points must be discoverable** - They're discovered in the spawned process
250+
3. **No shared state** - Each process has its own memory space
251+
4. **Import paths** - Use absolute imports from `app.live.pipelines.interface`
252+
253+
## Example: Complete Pipeline Package
254+
255+
See `examples/external-pipeline-example/` for a complete working example.
256+
257+
## Testing Your Pipeline
258+
259+
Test locally:
260+
261+
```bash
262+
# Install ai-runner-base in development mode
263+
cd /path/to/ai-runner/runner
264+
pip install -e .
265+
266+
# Install your pipeline
267+
cd /path/to/my-pipeline
268+
pip install -e .
269+
270+
# Run the runner
271+
cd /path/to/ai-runner/runner
272+
PIPELINE=my-pipeline MODEL_ID=my-pipeline python -m app.main
273+
```
274+
275+
## Publishing to PyPI
276+
277+
1. Build your package:
278+
```bash
279+
python -m build
280+
```
281+
282+
2. Publish:
283+
```bash
284+
twine upload dist/*
285+
```
286+
287+
3. Users can then install:
288+
```bash
289+
pip install my-pipeline
290+
```
291+
292+
## Advanced: Pipeline Variants
293+
294+
You can create pipeline variants (like `streamdiffusion-sd15`) by:
295+
296+
1. Registering multiple entry points with different names
297+
2. Using the same pipeline class but with different default parameters
298+
3. Checking the pipeline name in `initialize()` to configure differently
299+
300+
Example:
301+
302+
```toml
303+
[project.entry-points."ai_runner.pipelines"]
304+
my-pipeline = "my_pipeline.pipeline:MyPipeline"
305+
my-pipeline-fast = "my_pipeline.pipeline:MyPipelineFast"
306+
my-pipeline-hq = "my_pipeline.pipeline:MyPipelineHQ"
307+
```
308+
309+
## Troubleshooting
310+
311+
### Pipeline Not Found
312+
313+
- Check that entry points are registered correctly in `pyproject.toml`
314+
- Verify the package is installed: `pip list | grep my-pipeline`
315+
- Check entry points: `python -c "from importlib.metadata import entry_points; print(list(entry_points(group='ai_runner.pipelines')))"`
316+
317+
### Import Errors in Spawned Process
318+
319+
- Ensure all dependencies are installed
320+
- Use absolute imports from `app.live.pipelines.interface`
321+
- Check that `ai-runner-base` is installed
322+
323+
### Parameters Not Parsing
324+
325+
- Verify the params entry point is registered
326+
- Check that your params class extends `BaseParams`
327+
- Ensure params can be imported without expensive dependencies
328+

0 commit comments

Comments
 (0)