Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e71f87b
init
pushkarm029 Feb 19, 2025
66f0b8f
completed: need better error handling + cleanup
pushkarm029 Feb 20, 2025
fe034fb
clippy
pushkarm029 Feb 21, 2025
99c909c
robust error handling
pushkarm029 Feb 21, 2025
0dbffd6
standardization
pushkarm029 Feb 21, 2025
b389a39
e2e tested with example
pushkarm029 Feb 21, 2025
251b82c
impl order creation
pushkarm029 Feb 23, 2025
86bd98b
Remove partially filled event
cjdsellers Feb 26, 2025
ed88156
Continue redis
cjdsellers Feb 27, 2025
9153e49
Improve key scanning
cjdsellers Feb 27, 2025
d8eb3ba
replace `from_limit`, 'from_market' to 'from'
pushkarm029 Mar 11, 2025
d8fa1b1
Implement a few add and update methods in Rust for Python, remove dea…
pushkarm029 Mar 18, 2025
2c50e31
more e2e tests fixed
pushkarm029 Mar 18, 2025
c1381e5
refined pyo3_to_cython to be more generic
pushkarm029 Mar 18, 2025
16ebd25
revert to cython's add/load/update-position
pushkarm029 Mar 19, 2025
a55dab2
fixed all order relevant tests
pushkarm029 Mar 19, 2025
f33525c
Added Tests
pushkarm029 Mar 23, 2025
2b0d2b4
cleanup
pushkarm029 Mar 23, 2025
5e3a79d
Fix Currency serde issues
pushkarm029 Mar 25, 2025
dca813d
Fix Position handling, added transformer for position
pushkarm029 Mar 25, 2025
e4f9a5f
strategy handling ported
pushkarm029 Mar 26, 2025
f9fd62c
port general load
pushkarm029 Mar 26, 2025
2be7534
cleanup
pushkarm029 Mar 27, 2025
465ad29
remove redundant test case
pushkarm029 Mar 27, 2025
78cc223
Fix docstring
cjdsellers Mar 29, 2025
9f163f3
added prints to help debugging CURRENCY_MAP + few fixes
pushkarm029 Apr 2, 2025
fda5e2d
Merge develop and revert Position changes
cjdsellers Apr 8, 2025
1d6f479
Add position transformation functions
cjdsellers Apr 8, 2025
06373a6
Add TODOs
cjdsellers Apr 8, 2025
8975069
Merge branch 'develop' into redis
pushkarm029 Apr 10, 2025
c52ce25
Merge branch 'develop' into redis
pushkarm029 Apr 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions crates/common/src/cache/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub trait CacheDatabaseAdapter {

async fn load_all(&self) -> anyhow::Result<CacheMap>;

fn load(&self) -> anyhow::Result<HashMap<String, Bytes>>;
async fn load(&mut self) -> anyhow::Result<HashMap<String, Bytes>>;

async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>>;

Expand Down Expand Up @@ -108,7 +108,10 @@ pub trait CacheDatabaseAdapter {

fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>>;

fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>>;
async fn load_strategy(
&self,
strategy_id: &StrategyId,
) -> anyhow::Result<HashMap<String, Bytes>>;

fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>>;

Expand All @@ -130,21 +133,21 @@ pub trait CacheDatabaseAdapter {

fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>>;

fn add(&self, key: String, value: Bytes) -> anyhow::Result<()>;
fn add(&mut self, key: String, value: Bytes) -> anyhow::Result<()>;

fn add_currency(&self, currency: &Currency) -> anyhow::Result<()>;
fn add_currency(&mut self, currency: &Currency) -> anyhow::Result<()>;

fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()>;
fn add_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()>;

fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()>;

fn add_account(&self, account: &AccountAny) -> anyhow::Result<()>;
fn add_account(&mut self, account: &AccountAny) -> anyhow::Result<()>;

fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()>;
fn add_order(&mut self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()>;

fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()>;

fn add_position(&self, position: &Position) -> anyhow::Result<()>;
fn add_position(&mut self, position: &Position) -> anyhow::Result<()>;

fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()>;

Expand All @@ -170,7 +173,7 @@ pub trait CacheDatabaseAdapter {

fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()>;

fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()>;
fn delete_strategy(&mut self, strategy_id: &StrategyId) -> anyhow::Result<()>;

fn index_venue_order_id(
&self,
Expand All @@ -186,13 +189,14 @@ pub trait CacheDatabaseAdapter {

fn update_actor(&self) -> anyhow::Result<()>;

fn update_strategy(&self) -> anyhow::Result<()>;
fn update_strategy(&mut self, id: &str, strategy: HashMap<String, Bytes>)
-> anyhow::Result<()>;

fn update_account(&self, account: &AccountAny) -> anyhow::Result<()>;
fn update_account(&mut self, account: &AccountAny) -> anyhow::Result<()>;

fn update_order(&self, order_event: &OrderEventAny) -> anyhow::Result<()>;
fn update_order(&mut self, order_event: &OrderEventAny) -> anyhow::Result<()>;

fn update_position(&self, position: &Position) -> anyhow::Result<()>;
fn update_position(&mut self, position: &Position) -> anyhow::Result<()>;

fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()>;

Expand Down
4 changes: 2 additions & 2 deletions crates/common/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ impl Cache {
// -- COMMANDS --------------------------------------------------------------------------------

/// Clears the current general cache and loads the general objects from the cache database.
pub fn cache_general(&mut self) -> anyhow::Result<()> {
pub async fn cache_general(&mut self) -> anyhow::Result<()> {
self.general = match &mut self.database {
Some(db) => db.load()?,
Some(db) => db.load().await?,
None => HashMap::new(),
};

Expand Down
5 changes: 3 additions & 2 deletions crates/common/src/cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ fn test_flush_db_when_empty(mut cache: Cache) {
}

#[rstest]
fn test_cache_general_when_no_database(mut cache: Cache) {
assert!(cache.cache_general().is_ok());
#[tokio::test]
async fn test_cache_general_when_no_database(mut cache: Cache) {
assert!(cache.cache_general().await.is_ok());
}

// -- EXECUTION -------------------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions crates/execution/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ impl ExecutionEngine {
{
let mut cache = self.cache.borrow_mut();
cache.clear_index();
cache.cache_general()?;
self.cache.borrow_mut().cache_all().await?;
cache.cache_general().await?;
cache.cache_all().await?;
cache.build_index();
let _ = cache.check_integrity();

Expand Down
209 changes: 205 additions & 4 deletions crates/infrastructure/src/python/redis/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,33 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::collections::HashMap;

use bytes::Bytes;
use nautilus_common::runtime::get_runtime;
use nautilus_common::{cache::database::CacheDatabaseAdapter, runtime::get_runtime};
use nautilus_core::{
UUID4,
python::{to_pyruntime_err, to_pyvalue_err},
};
use nautilus_model::{
identifiers::TraderId,
identifiers::{
AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId,
},
orders::Order,
position::Position,
python::{
account::account_any_to_pyobject, instruments::instrument_any_to_pyobject,
orders::order_any_to_pyobject,
account::{account_any_to_pyobject, pyobject_to_account_any},
instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
orders::{order_any_to_pyobject, pyobject_to_order_any},
},
types::Currency,
};
use pyo3::{
IntoPyObjectExt,
prelude::*,
types::{PyBytes, PyDict},
};
use ustr::Ustr;

use crate::redis::{cache::RedisCacheDatabase, queries::DatabaseQueries};

Expand Down Expand Up @@ -138,6 +147,140 @@ impl RedisCacheDatabase {
}
}

#[pyo3(name = "load")]
fn py_load(&mut self) -> PyResult<HashMap<String, Vec<u8>>> {
let result: Result<HashMap<String, Vec<u8>>, anyhow::Error> =
get_runtime().block_on(async {
let result = self.load().await?;
Ok(result.into_iter().map(|(k, v)| (k, v.to_vec())).collect())
});
result.map_err(to_pyruntime_err)
}

#[pyo3(name = "load_currency")]
fn py_load_currency(&self, code: &str) -> PyResult<Option<Currency>> {
let result = get_runtime().block_on(async {
DatabaseQueries::load_currency(
&self.con,
self.get_trader_key(),
&Ustr::from(code),
self.get_encoding(),
)
.await
});
result.map_err(to_pyruntime_err)
}

#[pyo3(name = "load_account")]
fn py_load_account(&self, py: Python, account_id: AccountId) -> PyResult<Option<PyObject>> {
get_runtime().block_on(async {
let result = DatabaseQueries::load_account(
&self.con,
self.get_trader_key(),
&account_id,
self.get_encoding(),
)
.await;

match result {
Ok(Some(account)) => {
let py_object = account_any_to_pyobject(py, account)?;
Ok(Some(py_object))
}
Ok(None) => Ok(None),
Err(e) => Err(to_pyruntime_err(e)),
}
})
}

#[pyo3(name = "load_order")]
fn py_load_order(
&self,
py: Python,
client_order_id: ClientOrderId,
) -> PyResult<Option<PyObject>> {
get_runtime().block_on(async {
let result = DatabaseQueries::load_order(
&self.con,
self.get_trader_key(),
&client_order_id,
self.get_encoding(),
)
.await;

match result {
Ok(Some(order)) => {
let py_object = order_any_to_pyobject(py, order)?;
Ok(Some(py_object))
}
Ok(None) => Ok(None),
Err(e) => Err(to_pyruntime_err(e)),
}
})
}

#[pyo3(name = "load_instrument")]
fn py_load_instrument(
&self,
py: Python,
instrument_id: InstrumentId,
) -> PyResult<Option<PyObject>> {
get_runtime().block_on(async {
let result = DatabaseQueries::load_instrument(
&self.con,
self.get_trader_key(),
&instrument_id,
self.get_encoding(),
)
.await;

match result {
Ok(Some(instrument)) => {
let py_object = instrument_any_to_pyobject(py, instrument)?;
Ok(Some(py_object))
}
Ok(None) => Ok(None),
Err(e) => Err(to_pyruntime_err(e)),
}
})
}

#[pyo3(name = "load_position")]
fn py_load_position(&self, position_id: PositionId) -> PyResult<Option<Position>> {
get_runtime()
.block_on(async {
DatabaseQueries::load_position(
&self.con,
self.get_trader_key(),
&position_id,
self.get_encoding(),
)
.await
})
.map_err(to_pyruntime_err)
}

#[pyo3(name = "load_strategy")]
fn py_load_strategy(&self, strategy_id: &str) -> PyResult<HashMap<String, Vec<u8>>> {
get_runtime().block_on(async {
DatabaseQueries::load_strategy(
&self.con,
self.get_trader_key(),
&StrategyId::new(strategy_id),
self.get_encoding(),
)
.await
.map_err(to_pyruntime_err)
.map(|map| map.into_iter().map(|(k, v)| (k, v.to_vec())).collect())
})
}

#[pyo3(name = "delete_strategy")]
fn py_delete_strategy(&mut self, strategy_id: &str) -> PyResult<()> {
self.delete_strategy(&StrategyId::new(strategy_id))
.map_err(to_pyruntime_err)
}

#[pyo3(name = "read")]
fn py_read(&mut self, py: Python, key: &str) -> PyResult<Vec<PyObject>> {
let result = get_runtime().block_on(async { self.read(key).await });
Expand Down Expand Up @@ -165,11 +308,69 @@ impl RedisCacheDatabase {
self.update(key, Some(payload)).map_err(to_pyvalue_err)
}

#[pyo3(name = "update_strategy")]
fn py_update_strategy(&mut self, id: &str, strategy: HashMap<String, Vec<u8>>) -> PyResult<()> {
let strategy_map: HashMap<String, Bytes> = strategy
.into_iter()
.map(|(k, v)| (k, Bytes::from(v)))
.collect();
self.update_strategy(id, strategy_map)
.map_err(to_pyvalue_err)
}

#[pyo3(name = "delete")]
#[pyo3(signature = (key, payload=None))]
fn py_delete(&mut self, key: String, payload: Option<Vec<Vec<u8>>>) -> PyResult<()> {
let payload: Option<Vec<Bytes>> =
payload.map(|vec| vec.into_iter().map(Bytes::from).collect());
self.delete(key, payload).map_err(to_pyvalue_err)
}

#[pyo3(name = "add")]
fn py_add(&mut self, key: String, value: Vec<u8>) -> PyResult<()> {
self.add(key, Bytes::from(value)).map_err(to_pyvalue_err)
}

#[pyo3(name = "add_currency")]
fn py_add_currency(&mut self, currency: Currency) -> PyResult<()> {
self.add_currency(&currency).map_err(to_pyvalue_err)
}

#[pyo3(name = "add_instrument")]
fn py_add_instrument(&mut self, py: Python, instrument: PyObject) -> PyResult<()> {
let instrument_any = pyobject_to_instrument_any(py, instrument)?;
self.add_instrument(&instrument_any).map_err(to_pyvalue_err)
}

#[pyo3(name = "add_account")]
fn py_add_account(&mut self, py: Python, account: PyObject) -> PyResult<()> {
let account_any = pyobject_to_account_any(py, account)?;
self.add_account(&account_any).map_err(to_pyvalue_err)
}

#[pyo3(name = "add_order")]
#[pyo3(signature = (order, _position_id=None,client_id=None))]
fn py_add_order(
&mut self,
py: Python,
order: PyObject,
_position_id: Option<PositionId>,
client_id: Option<ClientId>,
) -> PyResult<()> {
let order_any = pyobject_to_order_any(py, order)?;
self.add_order(&order_any, client_id)
.map_err(to_pyvalue_err)
}

#[pyo3(name = "add_position")]
fn py_add_position(&mut self, position: Position) -> PyResult<()> {
self.add_position(&position).map_err(to_pyvalue_err)
}

#[pyo3(name = "update_order")]
fn py_update_order(&mut self, py: Python, order: PyObject) -> PyResult<()> {
let order_any = pyobject_to_order_any(py, order)?;
self.update_order(order_any.last_event())
.map_err(to_pyvalue_err)
}
}
Loading