Skip to content

Commit ef81b30

Browse files
authored
Merge pull request #130 from pipeless-ai/user_data
feat(core): Allow to pass custom user data through stages
2 parents 5f8254f + 54959a1 commit ef81b30

File tree

6 files changed

+113
-6
lines changed

6 files changed

+113
-6
lines changed

examples/custom-data/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Pass custom data between stages
2+
3+
Check [this guide](https://www.pipeless.ai/docs/v1/examples/custom-data) to run the example step by step.

examples/custom-data/process.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
def hook(frame_data, _):
2+
# Add data to the frame that you can later recover from hooks of subsequent stages. You can also recover it from subsequent hooks of the same stage.
3+
# You can use any kind of data.
4+
# Integers: frame_data['user_data'] = 100
5+
# Floats: frame_data['user_data'] = 100.5
6+
# Strings: frame_data['user_data'] = "Hello!"
7+
# Heterogeneus arrays: frame_data['user_data'] = ["string", 13, 34.6]
8+
# Heterogeneus Dictionaries (IMPORTANT: all keys must be strings):
9+
frame_data['user_data'] = {
10+
"key1": 0,
11+
"key2": [1, "3"],
12+
"key3": { "inner1": "hola" }
13+
}
14+
15+
# In a later hook you can obtain the data like:
16+
# my_data = frame_data['user_data']
17+
18+
# To connect stages simply give the list to Pipeless when adding a stream:
19+
# pipeless add stream --input-uri "file:///home/user/my/path.mp4" --output-uri "screen" --frame-path "stage1,stage2"

pipeless/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pipeless/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pipeless-ai"
3-
version = "1.7.0"
3+
version = "1.8.0"
44
edition = "2021"
55
authors = ["Miguel A. Cabrera Minagorri"]
66
description = "An open-source computer vision framework to build and deploy applications in minutes"

pipeless/src/data.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,17 @@ use ndarray;
33
use uuid;
44
use gstreamer as gst;
55

6+
/// Custom data that the user can add to the frame in a stage
7+
/// allowing to pass data to subsequent stages
8+
pub enum UserData {
9+
Empty,
10+
Integer(i32),
11+
Float(f64),
12+
String(String),
13+
Array(Vec<UserData>),
14+
Dictionary(Vec<(String, UserData)>),
15+
}
16+
617
pub struct RgbFrame {
718
uuid: uuid::Uuid,
819
original: ndarray::Array3<u8>,
@@ -17,6 +28,7 @@ pub struct RgbFrame {
1728
inference_input: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
1829
inference_output: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
1930
pipeline_id: uuid::Uuid,
31+
user_data: UserData,
2032
}
2133
impl RgbFrame {
2234
pub fn new(
@@ -36,6 +48,7 @@ impl RgbFrame {
3648
inference_input: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[0])),
3749
inference_output: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[0])),
3850
pipeline_id,
51+
user_data: UserData::Empty,
3952
}
4053
}
4154

@@ -49,6 +62,7 @@ impl RgbFrame {
4962
inference_input: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
5063
inference_output: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
5164
pipeline_id: &str,
65+
user_data: UserData,
5266
) -> Self {
5367
RgbFrame {
5468
uuid: uuid::Uuid::from_str(uuid).unwrap(),
@@ -60,6 +74,7 @@ impl RgbFrame {
6074
fps, input_ts,
6175
inference_input, inference_output,
6276
pipeline_id: uuid::Uuid::from_str(pipeline_id).unwrap(),
77+
user_data: user_data
6378
}
6479
}
6580

@@ -119,6 +134,9 @@ impl RgbFrame {
119134
pub fn set_pipeline_id(&mut self, pipeline_id: &str) {
120135
self.pipeline_id = uuid::Uuid::from_str(pipeline_id).unwrap();
121136
}
137+
pub fn get_user_data(&self) -> &UserData {
138+
&self.user_data
139+
}
122140
}
123141

124142
pub enum Frame {
@@ -170,4 +188,4 @@ impl Frame {
170188
Frame::RgbFrame(frame) => { frame.set_inference_output(output_data); },
171189
}
172190
}
173-
}
191+
}

pipeless/src/stages/languages/python.rs

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use log::{error, warn};
2-
use pyo3::prelude::*;
2+
use pyo3::{PyObject, prelude::*};
33
use numpy::{self, ToPyArray};
44

5-
use crate::{data::{RgbFrame, Frame}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store};
5+
use crate::{data::{Frame, RgbFrame, UserData}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store};
66

77
/// Allows a Frame to be converted from Rust to Python
88
impl IntoPy<Py<PyAny>> for Frame {
@@ -41,6 +41,7 @@ impl IntoPy<Py<PyAny>> for RgbFrame {
4141
dict.set_item("inference_input", self.get_inference_input().to_pyarray(py)).unwrap();
4242
dict.set_item("inference_output", self.get_inference_output().to_pyarray(py)).unwrap();
4343
dict.set_item("pipeline_id", self.get_pipeline_id().to_string()).unwrap();
44+
dict.set_item("user_data", self.get_user_data()).unwrap();
4445
dict.into()
4546
}
4647
}
@@ -82,18 +83,84 @@ impl<'source> FromPyObject<'source> for RgbFrame {
8283
let inference_input = inference_input_ndarray;
8384
let inference_output =inference_output_ndarray;
8485
let pipeline_id = ob.get_item("pipeline_id").unwrap().extract()?;
86+
let user_data = ob.get_item("user_data").unwrap().extract()?;
8587

8688
let frame = RgbFrame::from_values(
8789
uuid, original, modified, width, height,
8890
pts, dts, duration, fps, input_ts,
8991
inference_input, inference_output,
90-
pipeline_id,
92+
pipeline_id, user_data
9193
);
9294

9395
Ok(frame)
9496
}
9597
}
9698

99+
/// Allows to pass the user data to python and back
100+
impl ToPyObject for UserData {
101+
fn to_object(&self, py: Python<'_>) -> PyObject {
102+
match self {
103+
UserData::Empty => py.None(),
104+
UserData::Integer(i) => i.into_py(py),
105+
UserData::Float(f) => f.into_py(py),
106+
UserData::String(s) => s.into_py(py),
107+
UserData::Array(arr) => {
108+
let list = pyo3::types::PyList::empty(py);
109+
for item in arr {
110+
list.append(item.to_object(py)).unwrap();
111+
}
112+
list.into_py(py)
113+
}
114+
UserData::Dictionary(dict) => {
115+
let py_dict = pyo3::types::PyDict::new(py);
116+
for (key, value) in dict {
117+
py_dict.set_item(key, value.to_object(py)).unwrap();
118+
}
119+
py_dict.into_py(py)
120+
}
121+
}
122+
}
123+
}
124+
125+
/// Allows to pass the user data to python and back
126+
impl<'source> FromPyObject<'source> for UserData {
127+
fn extract(obj: &'source PyAny) -> PyResult<Self> {
128+
if let Ok(integer) = obj.extract::<i32>() {
129+
Ok(UserData::Integer(integer))
130+
} else if let Ok(float) = obj.extract::<f64>() {
131+
Ok(UserData::Float(float))
132+
} else if let Ok(string) = obj.extract::<String>() {
133+
Ok(UserData::String(string))
134+
} else if obj.is_instance_of::<pyo3::types::PyList>() {
135+
let array = obj.downcast::<pyo3::types::PyList>()?;
136+
let array_data = array.into_iter()
137+
.map(|elem| UserData::extract(elem))
138+
.collect::<PyResult<Vec<UserData>>>()?;
139+
Ok(UserData::Array(array_data))
140+
} else if obj.is_instance_of::<pyo3::types::PyDict>() {
141+
let dict = obj.downcast::<pyo3::types::PyDict>()?;
142+
let dict_keys = dict.keys();
143+
let mut dict_items = Vec::new();
144+
for key in dict_keys {
145+
let key_str = key.extract::<String>()?;
146+
let value = dict.get_item(key)?;
147+
match value {
148+
Some(v) => {
149+
let value_data = UserData::extract(v)?;
150+
dict_items.push((key_str, value_data));
151+
},
152+
None => { dict_items.push((key_str, UserData::Empty)); },
153+
}
154+
}
155+
Ok(UserData::Dictionary(dict_items))
156+
} else if obj.is_none() {
157+
Ok(UserData::Empty)
158+
} else {
159+
Err(pyo3::exceptions::PyTypeError::new_err("Unsupported data type assigned to 'user_data'. Please check in the Pipeless the supported types."))
160+
}
161+
}
162+
}
163+
97164
/// Python context to maintain within a stage
98165
pub struct PythonStageContext {
99166
context: Py<pyo3::types::PyDict>,

0 commit comments

Comments
 (0)