Skip to content

Commit ab464b2

Browse files
committed
Add arrow schema for instrument close
1 parent 153cf0d commit ab464b2

File tree

26 files changed

+1031
-183
lines changed

26 files changed

+1031
-183
lines changed

RELEASES.md

+3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ This release adds support for Python 3.13 (*not* yet compatible with free-thread
1313
- Added log file rotation with additional config options `max_file_size` and `max_backup_count` (#2468), thanks @xingyanan and @twitu
1414
- Added `bars_timestamp_on_close` config option for `BybitDataClientConfig` (default `True` to match Nautilus conventions)
1515
- Added `BetfairSequenceCompleted` custom data type for Betfair to mark the completion of a sequence of messages
16+
- Added Arrow schema for `MarkPriceUpdate` in Rust
17+
- Added Arrow schema for `IndexPriceUpdate` in Rust
18+
- Added Arrow schema for `InstrumentClose` in Rust
1619
- Improved robustness of in-flight order check for `LiveExecutionEngine`, once exceeded query retries will resolve submitted orders as rejected and pending orders as canceled
1720
- Improved logging for `BacktestNode` crashes with full stack trace and prettier config logging
1821

crates/adapters/tardis/src/replay.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,7 @@ pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> an
188188
Data::Trade(msg) => handle_trade_msg(msg, &mut trades_map, &mut trades_cursors, &path),
189189
Data::Bar(msg) => handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path),
190190
Data::Delta(_) => panic!("Individual delta message not implemented (or required)"),
191-
Data::MarkPrice(_) => panic!("Not implemented"),
192-
Data::IndexPrice(_) => panic!("Not implemented"),
191+
_ => panic!("Not implemented"),
193192
}
194193

195194
msg_count += 1;

crates/data/src/engine/mod.rs

+16-9
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ use nautilus_model::{
7676
data::{
7777
Bar, BarType, Data, DataType, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick,
7878
TradeTick,
79+
close::InstrumentClose,
7980
prices::{IndexPriceUpdate, MarkPriceUpdate},
8081
},
8182
enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
@@ -442,8 +443,9 @@ impl DataEngine {
442443
Data::Quote(quote) => self.handle_quote(quote),
443444
Data::Trade(trade) => self.handle_trade(trade),
444445
Data::Bar(bar) => self.handle_bar(bar),
445-
Data::MarkPrice(mark_price) => self.handle_mark_price(mark_price),
446-
Data::IndexPrice(index_price) => self.handle_index_price(index_price),
446+
Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
447+
Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
448+
Data::InstrumentClose(close) => self.handle_instrument_close(close),
447449
}
448450
}
449451

@@ -550,12 +552,12 @@ impl DataEngine {
550552
};
551553

552554
let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
553-
msgbus::publish(&topic, &deltas as &dyn Any); // TODO: Optimize
555+
msgbus::publish(&topic, &deltas as &dyn Any);
554556
}
555557

556558
fn handle_depth10(&mut self, depth: OrderBookDepth10) {
557559
let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
558-
msgbus::publish(&topic, &depth as &dyn Any); // TODO: Optimize
560+
msgbus::publish(&topic, &depth as &dyn Any);
559561
}
560562

561563
fn handle_quote(&mut self, quote: QuoteTick) {
@@ -566,7 +568,7 @@ impl DataEngine {
566568
// TODO: Handle synthetics
567569

568570
let topic = switchboard::get_quotes_topic(quote.instrument_id);
569-
msgbus::publish(&topic, &quote as &dyn Any); // TODO: Optimize
571+
msgbus::publish(&topic, &quote as &dyn Any);
570572
}
571573

572574
fn handle_trade(&mut self, trade: TradeTick) {
@@ -577,7 +579,7 @@ impl DataEngine {
577579
// TODO: Handle synthetics
578580

579581
let topic = switchboard::get_trades_topic(trade.instrument_id);
580-
msgbus::publish(&topic, &trade as &dyn Any); // TODO: Optimize
582+
msgbus::publish(&topic, &trade as &dyn Any);
581583
}
582584

583585
fn handle_bar(&mut self, bar: Bar) {
@@ -607,7 +609,7 @@ impl DataEngine {
607609
}
608610

609611
let topic = switchboard::get_bars_topic(bar.bar_type);
610-
msgbus::publish(&topic, &bar as &dyn Any); // TODO: Optimize
612+
msgbus::publish(&topic, &bar as &dyn Any);
611613
}
612614

613615
fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
@@ -616,7 +618,7 @@ impl DataEngine {
616618
}
617619

618620
let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
619-
msgbus::publish(&topic, &mark_price as &dyn Any); // TODO: Optimize
621+
msgbus::publish(&topic, &mark_price as &dyn Any);
620622
}
621623

622624
fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
@@ -630,7 +632,12 @@ impl DataEngine {
630632
}
631633

632634
let topic = switchboard::get_index_price_topic(index_price.instrument_id);
633-
msgbus::publish(&topic, &index_price as &dyn Any); // TODO: Optimize
635+
msgbus::publish(&topic, &index_price as &dyn Any);
636+
}
637+
638+
fn handle_instrument_close(&mut self, close: InstrumentClose) {
639+
let topic = switchboard::get_instrument_close_topic(close.instrument_id);
640+
msgbus::publish(&topic, &close as &dyn Any);
634641
}
635642

636643
// -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------

crates/data/src/engine/tests.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ fn test_process_mark_price(
946946
msgbus::subscribe(topic, handler.clone(), None);
947947

948948
let mut data_engine = data_engine.borrow_mut();
949-
data_engine.process_data(Data::MarkPrice(mark_price));
949+
data_engine.process_data(Data::MarkPriceUpdate(mark_price));
950950
let cache = &data_engine.get_cache();
951951
let messages = get_saved_messages::<MarkPriceUpdate>(handler);
952952

@@ -1005,7 +1005,7 @@ fn test_process_index_price(
10051005
msgbus::subscribe(topic, handler.clone(), None);
10061006

10071007
let mut data_engine = data_engine.borrow_mut();
1008-
data_engine.process_data(Data::IndexPrice(index_price));
1008+
data_engine.process_data(Data::IndexPriceUpdate(index_price));
10091009
let cache = &data_engine.get_cache();
10101010
let messages = get_saved_messages::<IndexPriceUpdate>(handler);
10111011

crates/model/cbindgen.toml

+3
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,6 @@ exclude = [
7070
"UUID4" = "UUID4_t"
7171
"Venue" = "Venue_t"
7272
"VenueOrderId" = "VenueOrderId_t"
73+
"MarkPriceUpdate" = "MarkPriceUpdate_t"
74+
"IndexPriceUpdate" = "IndexPriceUpdate_t"
75+
"InstrumentClose" = "InstrumentClose_t"

crates/model/cbindgen_cython.toml

+3
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,6 @@ exclude = [
8787
"UUID4" = "UUID4_t"
8888
"Venue" = "Venue_t"
8989
"VenueOrderId" = "VenueOrderId_t"
90+
"MarkPriceUpdate" = "MarkPriceUpdate_t"
91+
"IndexPriceUpdate" = "IndexPriceUpdate_t"
92+
"InstrumentClose" = "InstrumentClose_t"

crates/model/src/data/close.rs

+22-2
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717
1818
use std::{collections::HashMap, fmt::Display, hash::Hash};
1919

20+
use indexmap::IndexMap;
2021
use nautilus_core::{UnixNanos, serialization::Serializable};
2122
use serde::{Deserialize, Serialize};
2223

2324
use super::GetTsInit;
24-
use crate::{enums::InstrumentCloseType, identifiers::InstrumentId, types::Price};
25+
use crate::{
26+
enums::InstrumentCloseType,
27+
identifiers::InstrumentId,
28+
types::{Price, fixed::FIXED_SIZE_BINARY},
29+
};
2530

2631
/// Represents an instrument close at a venue.
2732
#[repr(C)]
@@ -64,9 +69,24 @@ impl InstrumentClose {
6469

6570
/// Returns the metadata for the type, for use with serialization formats.
6671
#[must_use]
67-
pub fn get_metadata(instrument_id: &InstrumentId) -> HashMap<String, String> {
72+
pub fn get_metadata(
73+
instrument_id: &InstrumentId,
74+
price_precision: u8,
75+
) -> HashMap<String, String> {
6876
let mut metadata = HashMap::new();
6977
metadata.insert("instrument_id".to_string(), instrument_id.to_string());
78+
metadata.insert("price_precision".to_string(), price_precision.to_string());
79+
metadata
80+
}
81+
82+
/// Returns the field map for the type, for use with Arrow schemas.
83+
#[must_use]
84+
pub fn get_fields() -> IndexMap<String, String> {
85+
let mut metadata = IndexMap::new();
86+
metadata.insert("close_price".to_string(), FIXED_SIZE_BINARY.to_string());
87+
metadata.insert("close_type".to_string(), "UInt8".to_string());
88+
metadata.insert("ts_event".to_string(), "UInt64".to_string());
89+
metadata.insert("ts_init".to_string(), "UInt64".to_string());
7090
metadata
7191
}
7292
}

0 commit comments

Comments
 (0)