Skip to content

Commit 4d6869e

Browse files
committed
add python bridge
Signed-off-by: michaelfeil <[email protected]>
1 parent 7e3f92b commit 4d6869e

File tree

5 files changed

+18
-10
lines changed

5 files changed

+18
-10
lines changed

lib/bindings/python/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/bindings/python/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ tokio-util = { version = "0.7", features = ["rt"] }
4747
tracing = { version = "0" }
4848
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
4949
uuid = { version = "1.17", features = ["v4", "serde"] }
50+
crossbeam-channel = "0.5"
5051

5152
# "extension-module" tells pyo3 we want to build an extension module (skips linking against libpython.so)
5253
# "abi3-py310" tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.10, which is the minimum version in pyproject.toml

lib/bindings/python/rust/bridge.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
sync::{OnceLock, mpsc},
3-
thread::{self, ThreadId},
3+
thread::{self},
44
};
55

66
use crossbeam_channel::{Sender, bounded};
@@ -135,4 +135,4 @@ impl Bridge {
135135
.recv()
136136
.map_err(|_| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>("bridge dropped"))?
137137
}
138-
}
138+
}

lib/bindings/python/rust/engine.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,19 @@ use pyo3::prelude::*;
88
use pyo3::types::{PyDict, PyModule};
99
use pyo3::{PyAny, PyErr};
1010
use pyo3_async_runtimes::TaskLocals;
11-
use pythonize::{depythonize, pythonize};
11+
use pythonize::pythonize;
1212
pub use serde::{Deserialize, Serialize};
1313
use tokio::sync::mpsc;
1414
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
1515
use tokio_util::sync::CancellationToken;
1616

17+
use super::context::{Context, callable_accepts_kwarg};
18+
use crate::bridge;
1719
use dynamo_runtime::logging::get_distributed_tracing_context;
1820
pub use dynamo_runtime::{
1921
pipeline::{AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream, SingleIn},
2022
protocols::annotated::Annotated,
2123
};
22-
use crate::bridge;
23-
use super::context::{Context, callable_accepts_kwarg};
2424

2525
/// Add bingings from this crate to the provided module
2626
pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
@@ -182,7 +182,6 @@ where
182182
let stream = bridge::Bridge::global()
183183
.with_gil(move |py| {
184184
let py_request = pythonize(py, &request)?;
185-
let id = ctx_python.id().to_string();
186185

187186
let py_ctx = Py::new(py, Context::new(ctx_python.clone(), current_trace_context))?;
188187

lib/bindings/python/rust/lib.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl From<RouterMode> for RsRouterMode {
5757
}
5858
}
5959

60+
mod bridge;
6061
mod context;
6162
mod engine;
6263
mod http;
@@ -66,6 +67,8 @@ mod parsers;
6667
mod planner;
6768
mod prometheus_metrics;
6869

70+
use bridge::Bridge;
71+
6972
type JsonServerStreamingIngress =
7073
Ingress<SingleIn<serde_json::Value>, ManyOut<RsAnnotated<serde_json::Value>>>;
7174

@@ -899,6 +902,7 @@ impl Client {
899902
context: Option<context::Context>,
900903
) -> PyResult<Bound<'p, PyAny>> {
901904
// Convert to an owned GIL-independent handle and move into async:
905+
#[allow(deprecated)]
902906
let request_py: Py<PyAny> = request.into_py(py);
903907
let annotated = annotated.unwrap_or(false);
904908

@@ -940,6 +944,7 @@ impl Client {
940944
context: Option<context::Context>,
941945
) -> PyResult<Bound<'p, PyAny>> {
942946
// Convert to an owned GIL-independent handle and move into async:
947+
#[allow(deprecated)]
943948
let request_py: Py<PyAny> = request.into_py(py);
944949
let annotated = annotated.unwrap_or(false);
945950

@@ -982,6 +987,7 @@ impl Client {
982987
context: Option<context::Context>,
983988
) -> PyResult<Bound<'p, PyAny>> {
984989
// Convert to an owned GIL-independent handle and move into async:
990+
#[allow(deprecated)]
985991
let request_py: Py<PyAny> = request.into_py(py);
986992
let annotated = annotated.unwrap_or(false);
987993

@@ -1028,13 +1034,14 @@ async fn process_stream(
10281034
while let Some(response) = stream.next().await {
10291035
// Convert the response to a PyObject using Python's GIL
10301036
let annotated: RsAnnotated<serde_json::Value> = response;
1031-
let annotated: RsAnnotated<PyObject> = .map_data_async(|data| async move {
1032-
Bridge::global()
1037+
let annotated: RsAnnotated<PyObject> = annotated
1038+
.map_data_async(|data| async move {
1039+
Bridge::global()
10331040
.to_py(data) // runs on bridge worker thread
10341041
.await
10351042
.map_err(|e| e.to_string())
1036-
.await;
1037-
});
1043+
})
1044+
.await;
10381045

10391046
let is_error = annotated.is_error();
10401047

0 commit comments

Comments
 (0)