Skip to content

Commit

Permalink
feat: Pagination with continuation tokens and fixed indexing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
lfbrehm committed Feb 10, 2025
1 parent 911888a commit 1cfcbe5
Show file tree
Hide file tree
Showing 16 changed files with 411 additions and 166 deletions.
4 changes: 2 additions & 2 deletions aruna-server/src/api/grpc/grpc_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ impl TryFrom<grpc::GetRelationsRequest> for requests::GetRelationsRequest {
requests::Direction::Outgoing
},
filter: value.filter,
last_entry: Some(value.offset as usize),
continuation_token: todo!(),
page_size: value.page_size as usize,
})
}
Expand All @@ -533,7 +533,7 @@ impl From<requests::GetRelationsResponse> for grpc::GetRelationsResponse {
fn from(value: requests::GetRelationsResponse) -> Self {
Self {
relations: value.relations.into_iter().map(|r| r.into()).collect(),
offset: value.offset.map(|v| v as u64),
offset: todo!(),
}
}
}
6 changes: 6 additions & 0 deletions aruna-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ impl From<bincode::Error> for ArunaError {
}
}

impl From<base64::DecodeError> for ArunaError {
fn from(e: base64::DecodeError) -> Self {
ArunaError::DeserializeError(e.to_string())
}
}

impl From<std::io::Error> for ArunaError {
fn from(e: std::io::Error) -> Self {
ArunaError::IoError(e.to_string())
Expand Down
21 changes: 21 additions & 0 deletions aruna-server/src/models/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,27 @@ pub struct RawRelation {
pub edge_type: EdgeType,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct RelationRange {
pub last_entry: Option<u32>,
pub page_size: u32,
}

impl Default for RelationRange {
fn default() -> Self {
RelationRange {
last_entry: None,
page_size: 1000,
}
}
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ContinuationToken {
pub last_incoming: Option<u32>,
pub last_outgoing: Option<u32>,
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, ToSchema)]
pub struct ServerInfo {
pub node_id: Ulid,
Expand Down
7 changes: 3 additions & 4 deletions aruna-server/src/models/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ pub enum Direction {
}

fn default_page_size() -> usize {
100
1000
}

#[derive(
Expand All @@ -328,16 +328,15 @@ pub struct GetRelationsRequest {
#[serde(default)]
pub filter: Vec<u32>, // Filter with Strings for directions or idx for rel idx?
#[serde(default)]
pub last_entry: Option<usize>, // usize
pub continuation_token: Option<String>, // usize
#[serde(default = "default_page_size")]
pub page_size: usize, // Max value 1000? Default 100
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, ToSchema)]
pub struct GetRelationsResponse {
pub relations: Vec<Relation>,
pub offset: Option<usize>,
pub total_hits: u32,
pub continuation_token: Option<String>,
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, ToSchema)]
Expand Down
169 changes: 98 additions & 71 deletions aruna-server/src/storage/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
use crate::{
error::ArunaError,
logerr,
models::models::{EdgeType, MilliIdx, NodeVariant, Permission, RawRelation},
models::models::{EdgeType, MilliIdx, NodeVariant, Permission, RawRelation, RelationRange},
};
use milli::heed::{types::SerdeBincode, Database, RoTxn};
use milli::{ObkvCodec, BEU32, BEU64};
Expand Down Expand Up @@ -119,93 +119,120 @@ impl GraphTxn<'_> {
pub fn node_weight(&self, idx: MilliIdx) -> Option<NodeVariant> {
self.state.graph.node_weight(idx.0.into()).cloned()
}
pub fn get_relations(

pub fn get_relation_range(
&self,
idx: MilliIdx,
filter: Option<&[EdgeType]>,
direction: Direction,
range: Option<(u32, u32)>,
) -> Result<Vec<RawRelation>, ArunaError> {
range: RelationRange,
// Tuple with (relations, last_entry)
) -> Result<(Vec<RawRelation>, u32), ArunaError> {
let graph_idx = *self
.state
.idx_mappings
.milli_graph
.get(idx.0 as usize)
.ok_or_else(|| ArunaError::GraphError("Index not found".to_string()))?;

if let Some((start, end)) = range {
if start > end {
return Err(ArunaError::GraphError("Start bigger than end".to_string()));
let RelationRange {
last_entry,
page_size,
} = range;
let mut slice = Vec::new();
let mut edge = match last_entry {
Some(idx) => idx.into(),
None => self
.state
.graph
.edges(graph_idx.into())
.next()
.ok_or_else(|| ArunaError::GraphError("EdgeIndex not found".to_string()))?
.id(),
};
for _ in 0..page_size {
let edge_weight = match self.state.graph.edge_weight(edge) {
Some(edge) => edge,
None => continue,
};
let (source, target) = match self.state.graph.edge_endpoints(edge) {
Some(refs) => refs,
None => continue,
};
if let Some(filter) = filter {
if !filter.contains(edge_weight) {
continue;
}
}
let mut slice = Vec::new();
for e in start..end {
if let Some(edge) = self.state.graph.next_edge(e.into(), direction) {
let edge_weight = match self.state.graph.edge_weight(edge) {
Some(edge) => edge,
None => continue,
};
let (source, target) = match self.state.graph.edge_endpoints(edge) {
Some(refs) => refs,
None => continue,
};
if let Some(filter) = filter {
if !filter.contains(edge_weight) {
continue;
}
}
if matches!(self.mode, Mode::ReadTxn) {
if self
if matches!(self.mode, Mode::ReadTxn) {
if self
.state
.in_flight_tx
.edges
.contains(&(edge.index() as u32))
{
continue;
}
if self.state.in_flight_tx.nodes.contains(&target.as_u32())
|| self.state.in_flight_tx.nodes.contains(&source.as_u32())
{
continue;
}
}
let relation = match direction {
Direction::Outgoing => RawRelation {
source: idx,
target: MilliIdx(
match self
.state
.in_flight_tx
.edges
.contains(&(edge.index() as u32))
{
continue;
}
if self.state.in_flight_tx.nodes.contains(&target.as_u32())
|| self.state.in_flight_tx.nodes.contains(&source.as_u32())
.idx_mappings
.graph_milli
.get(target.as_u32() as usize)
{
continue;
}
}
let relation = match direction {
Direction::Outgoing => RawRelation {
source: idx,
target: MilliIdx(
match self
.state
.idx_mappings
.graph_milli
.get(target.as_u32() as usize)
{
Some(i) => *i,
None => continue,
},
),
edge_type: *edge_weight,
Some(i) => *i,
None => continue,
},
Direction::Incoming => RawRelation {
source: MilliIdx(
match self
.state
.idx_mappings
.graph_milli
.get(source.as_u32() as usize)
{
Some(i) => *i,
None => continue,
},
),
target: idx,
edge_type: *edge_weight,
),
edge_type: *edge_weight,
},
Direction::Incoming => RawRelation {
source: MilliIdx(
match self
.state
.idx_mappings
.graph_milli
.get(source.as_u32() as usize)
{
Some(i) => *i,
None => continue,
},
};
slice.push(relation);
} else {
break;
}
}
),
target: idx,
edge_type: *edge_weight,
},
};
slice.push(relation);
edge = match self.state.graph.next_edge(edge, direction) {
Some(edge) => edge,
None => break,
};
}
Ok((slice, edge.index() as u32))
}

pub fn get_relations(
&self,
idx: MilliIdx,
filter: Option<&[EdgeType]>,
direction: Direction,
) -> Result<Vec<RawRelation>, ArunaError> {
let graph_idx = *self
.state
.idx_mappings
.milli_graph
.get(idx.0 as usize)
.ok_or_else(|| ArunaError::GraphError("Index not found".to_string()))?;

Ok(self
.state
.graph
Expand Down
2 changes: 1 addition & 1 deletion aruna-server/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ pub mod init;
mod milli_helpers;
pub(crate) mod obkv_ext;
pub mod store;
pub(super) mod utils;
pub(super) mod txns;
pub(super) mod utils;
Loading

0 comments on commit 1cfcbe5

Please sign in to comment.