Skip to content

Commit 46b2e26

Browse files
committed
fix: upgrading to the latest version
1 parent 0d3b9cd commit 46b2e26

File tree

4 files changed

+71
-38
lines changed

4 files changed

+71
-38
lines changed

Cargo.toml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "pregel-rs"
3-
version = "0.0.13"
4-
authors = [ "Ángel Iglesias Préstamo <[email protected]>" ]
3+
version = "0.0.14"
4+
authors = ["Ángel Iglesias Préstamo <[email protected]>"]
55
description = "A Graph library written in Rust for implementing your own algorithms in a Pregel fashion"
66
documentation = "https://docs.rs/crate/pregel-rs/latest"
77
repository = "https://github.com/angelip2303/pregel-rs"
@@ -12,4 +12,10 @@ keywords = ["pregel", "graph", "pagerank", "polars", "algorithms"]
1212
categories = ["algorithms", "database", "mathematics", "science"]
1313

1414
[dependencies]
15-
polars = { version = "0.30.0", features = ["lazy", "streaming", "parquet", "performant", "chunked_ids"] }
15+
polars = { version = "0.45.1", features = [
16+
"lazy",
17+
"streaming",
18+
"parquet",
19+
"performant",
20+
"chunked_ids",
21+
] }

examples/maximum_value.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use polars::lazy::dsl::max_horizontal;
12
use polars::prelude::*;
23
use pregel_rs::graph_frame::GraphFrame;
34
use pregel_rs::pregel::Column::{Custom, Object, Subject, VertexId};
@@ -32,10 +33,10 @@ fn main() -> Result<(), Box<dyn Error>> {
3233
Column::subject(Custom("max_value")),
3334
)
3435
.aggregate_messages(Column::msg(None).max())
35-
.v_prog(max_exprs([
36+
.v_prog(max_horizontal([
3637
col(Custom("max_value").as_ref()),
3738
Column::msg(None),
38-
]))
39+
])?)
3940
.build();
4041

4142
Ok(println!("{}", pregel.run()?))

src/graph_frame.rs

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,13 @@ impl GraphFrame {
125125
/// `vertices` and `edges` DataFrames. If any of the required columns (`Id`, `Src`,
126126
/// `Dst`) are missing in the DataFrames, the function returns an `Error`.
127127
pub fn new(vertices: DataFrame, edges: DataFrame) -> Result<Self> {
128-
if !vertices.get_column_names().contains(&VertexId.as_ref()) {
128+
if !vertices.get_column_names_str().contains(&VertexId.as_ref()) {
129129
return Err(GraphFrameError::MissingColumn(MissingColumnError::VertexId));
130130
}
131-
if !edges.get_column_names().contains(&Subject.as_ref()) {
131+
if !edges.get_column_names_str().contains(&Subject.as_ref()) {
132132
return Err(GraphFrameError::MissingColumn(MissingColumnError::Subject));
133133
}
134-
if !edges.get_column_names().contains(&Object.as_ref()) {
134+
if !edges.get_column_names_str().contains(&Object.as_ref()) {
135135
return Err(GraphFrameError::MissingColumn(MissingColumnError::Object));
136136
}
137137

@@ -161,7 +161,7 @@ impl GraphFrame {
161161
.clone() // this is because cloning a DataFrame is cheap
162162
.lazy()
163163
.select([col(Object.as_ref()).alias(VertexId.as_ref())]);
164-
let vertices = concat([subjects, objects], true, true)?
164+
let vertices = concat([subjects, objects], Default::default())?
165165
.unique(
166166
Some(vec![VertexId.as_ref().to_string()]),
167167
UniqueKeepStrategy::First,
@@ -185,8 +185,10 @@ impl GraphFrame {
185185
pub fn out_degrees(self) -> PolarsResult<DataFrame> {
186186
self.edges
187187
.lazy()
188-
.groupby([col(Subject.as_ref()).alias(VertexId.as_ref())])
189-
.agg([count().alias(Custom("out_degree").as_ref())])
188+
.group_by([col(Subject.as_ref()).alias(VertexId.as_ref())])
189+
.agg([col(Object.as_ref())
190+
.count()
191+
.alias(Custom("out_degree").as_ref())])
190192
.collect()
191193
}
192194

@@ -204,8 +206,10 @@ impl GraphFrame {
204206
pub fn in_degrees(self) -> PolarsResult<DataFrame> {
205207
self.edges
206208
.lazy()
207-
.groupby([col(Object.as_ref())])
208-
.agg([count().alias(Custom("in_degree").as_ref())])
209+
.group_by([col(Object.as_ref())])
210+
.agg([col(Subject.as_ref())
211+
.count()
212+
.alias(Custom("in_degree").as_ref())])
209213
.collect()
210214
}
211215
}
@@ -232,8 +236,16 @@ mod tests {
232236
use polars::prelude::*;
233237

234238
fn graph() -> Result<GraphFrame, GraphFrameError> {
235-
let subjects = Series::new(Column::Subject.as_ref(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
236-
let objects = Series::new(Column::Object.as_ref(), [2, 3, 4, 5, 6, 7, 8, 9, 10, 1]);
239+
let subjects = Series::new(
240+
Column::Subject.as_ref().into(),
241+
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
242+
)
243+
.into();
244+
let objects = Series::new(
245+
Column::Object.as_ref().into(),
246+
[2, 3, 4, 5, 6, 7, 8, 9, 10, 1],
247+
)
248+
.into();
237249
GraphFrame::from_edges(DataFrame::new(vec![subjects, objects]).unwrap())
238250
}
239251

@@ -278,9 +290,10 @@ mod tests {
278290

279291
#[test]
280292
fn test_new_missing_vertex_id_column() {
281-
let vertices = DataFrame::new(vec![Series::new("not_vertex_id", [1, 2, 3])]).unwrap();
282-
let subjects = Series::new(Column::Subject.as_ref(), [1, 2, 3]);
283-
let objects = Series::new(Column::Object.as_ref(), [2, 3, 4]);
293+
let vertices =
294+
DataFrame::new(vec![Series::new("not_vertex_id".into(), [1, 2, 3]).into()]).unwrap();
295+
let subjects = Series::new(Column::Subject.as_ref().into(), [1, 2, 3]).into();
296+
let objects = Series::new(Column::Object.as_ref().into(), [2, 3, 4]).into();
284297
let edges = DataFrame::new(vec![subjects, objects]).unwrap();
285298
match GraphFrame::new(vertices, edges) {
286299
Ok(_) => panic!("Should have failed"),
@@ -290,10 +303,14 @@ mod tests {
290303

291304
#[test]
292305
fn test_new_missing_subject_column() {
293-
let vertices =
294-
DataFrame::new(vec![Series::new(Column::VertexId.as_ref(), [1, 2, 3])]).unwrap();
295-
let subjects = Series::new("not_src", [1, 2, 3]);
296-
let objects = Series::new(Column::Object.as_ref(), [2, 3, 4]);
306+
let vertices = DataFrame::new(vec![Series::new(
307+
Column::VertexId.as_ref().into(),
308+
[1, 2, 3],
309+
)
310+
.into()])
311+
.unwrap();
312+
let subjects = Series::new("not_src".into(), [1, 2, 3]).into();
313+
let objects = Series::new(Column::Object.as_ref().into(), [2, 3, 4]).into();
297314
let edges = DataFrame::new(vec![subjects, objects]).unwrap();
298315
match GraphFrame::new(vertices, edges) {
299316
Ok(_) => panic!("Should have failed"),
@@ -303,10 +320,14 @@ mod tests {
303320

304321
#[test]
305322
fn test_new_missing_object_column() {
306-
let vertices =
307-
DataFrame::new(vec![Series::new(Column::VertexId.as_ref(), [1, 2, 3])]).unwrap();
308-
let subjects = Series::new(Column::Subject.as_ref(), [1, 2, 3]);
309-
let objects = Series::new("not_dst", [2, 3, 4]);
323+
let vertices = DataFrame::new(vec![Series::new(
324+
Column::VertexId.as_ref().into(),
325+
[1, 2, 3],
326+
)
327+
.into()])
328+
.unwrap();
329+
let subjects = Series::new(Column::Subject.as_ref().into(), [1, 2, 3]).into();
330+
let objects = Series::new("not_dst".into(), [2, 3, 4]).into();
310331
let edges = DataFrame::new(vec![subjects, objects]).unwrap();
311332
match GraphFrame::new(vertices, edges) {
312333
Ok(_) => panic!("Should have failed"),

src/pregel.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,7 @@ impl<'a> PregelBuilder<'a> {
644644
///
645645
/// ```rust
646646
/// use polars::prelude::*;
647+
/// use polars::lazy::dsl::max_horizontal;
647648
/// use pregel_rs::graph_frame::GraphFrame;
648649
/// use pregel_rs::pregel::Column;
649650
/// use pregel_rs::pregel::Column::{Custom, Object, VertexId, Subject};
@@ -668,7 +669,7 @@ impl<'a> PregelBuilder<'a> {
668669
/// .initial_message(col(Custom("value").as_ref()))
669670
/// .send_messages(MessageReceiver::Object, Column::subject(Custom("max_value")))
670671
/// .aggregate_messages(Column::msg(None).max())
671-
/// .v_prog(max_exprs([col(Custom("max_value").as_ref()), Column::msg(None)]))
672+
/// .v_prog(max_horizontal([col(Custom("max_value").as_ref()), Column::msg(None)])?)
672673
/// .build();
673674
///
674675
/// Ok(println!("{}", pregel.run()?))
@@ -752,7 +753,7 @@ impl<'a> Pregel<'a> {
752753
.graph
753754
.edges
754755
.lazy()
755-
.select([all().prefix(&format!("{}.", Column::Edge.as_ref()))]);
756+
.select([all().name().prefix(&format!("{}.", Column::Edge.as_ref()))]);
756757
// We create a DataFrame that contains the vertices of the graph
757758
let vertices = &self.graph.vertices.lazy();
758759
// We start the execution of the algorithm from the super-step 0; that is, all the nodes
@@ -786,16 +787,18 @@ impl<'a> Pregel<'a> {
786787
let current_vertices_df = &current_vertices.lazy();
787788
let triplets_df = current_vertices_df
788789
.to_owned()
789-
.select([all().prefix(&format!("{}.", Column::Subject.as_ref()))])
790+
.select([all()
791+
.name()
792+
.prefix(&format!("{}.", Column::Subject.as_ref()))])
790793
.inner_join(
791794
edges.to_owned().select([all()]),
792795
Column::subject(Column::VertexId), // src column of the current_vertices DataFrame
793796
Column::edge(Column::Subject), // src column of the edges DataFrame
794797
)
795798
.inner_join(
796-
current_vertices_df
797-
.to_owned()
798-
.select([all().prefix(&format!("{}.", Column::Object.as_ref()))]),
799+
current_vertices_df.to_owned().select([all()
800+
.name()
801+
.prefix(&format!("{}.", Column::Object.as_ref()))]),
799802
Column::edge(Column::Object), // dst column of the resulting DataFrame
800803
Column::object(Column::VertexId), // id column of the current_vertices DataFrame
801804
);
@@ -826,7 +829,7 @@ impl<'a> Pregel<'a> {
826829
let aggregate_messages = &mut self.aggregate_messages;
827830
let message_df = triplets_df.select(send_messages);
828831
let aggregate_df = message_df
829-
.groupby([Column::msg(Some(Column::VertexId))])
832+
.group_by([Column::msg(Some(Column::VertexId))])
830833
.agg([aggregate_messages().alias(Column::Pregel.as_ref())]);
831834
// We Compute the new values for the vertices. Note that we have to check for possibly
832835
// null values after performing the outer join. This is, columns where the join key does
@@ -836,7 +839,7 @@ impl<'a> Pregel<'a> {
836839
let v_prog = &mut self.v_prog;
837840
let vertex_columns = current_vertices_df
838841
.to_owned()
839-
.outer_join(
842+
.full_join(
840843
aggregate_df,
841844
col(Column::VertexId.as_ref()), // id column of the current_vertices DataFrame
842845
Column::msg(Some(Column::VertexId)), // msg.id column of the message_df DataFrame
@@ -856,7 +859,7 @@ impl<'a> Pregel<'a> {
856859
col(Column::VertexId.as_ref()),
857860
col(Column::VertexId.as_ref()),
858861
)
859-
.with_common_subplan_elimination(false)
862+
.with_comm_subplan_elim(false)
860863
.collect()?;
861864

862865
iteration += 1; // increment the counter so we now which iteration is being executed
@@ -871,6 +874,7 @@ mod tests {
871874
use crate::graph_frame::GraphFrame;
872875
use crate::pregel::Column::VertexId;
873876
use crate::pregel::{Column, MessageReceiver, Pregel, PregelBuilder, SendMessage};
877+
use polars::lazy::dsl::max_horizontal;
874878
use polars::prelude::*;
875879
use std::error::Error;
876880

@@ -1020,7 +1024,8 @@ mod tests {
10201024
)],
10211025
aggregate_messages: Box::new(|| Column::msg(None).max()),
10221026
v_prog: Box::new(|| {
1023-
max_exprs([col(Column::Custom("max_value").as_ref()), Column::msg(None)])
1027+
max_horizontal([col(Column::Custom("max_value").as_ref()), Column::msg(None)])
1028+
.unwrap()
10241029
}),
10251030
})
10261031
}
@@ -1118,7 +1123,7 @@ mod tests {
11181123
Err(_) => return Err(String::from("Error running pregel")),
11191124
};
11201125

1121-
let sorted_pregel = match pregel.sort(&[VertexId.as_ref()], false) {
1126+
let sorted_pregel = match pregel.sort([VertexId.as_ref()], Default::default()) {
11221127
Ok(sorted_pregel) => sorted_pregel,
11231128
Err(_) => return Err(String::from("Error sorting the DataFrame")),
11241129
};
@@ -1128,7 +1133,7 @@ mod tests {
11281133
Err(_) => return Err(String::from("Error retrieving the column")),
11291134
};
11301135

1131-
let expected = Series::new("aux", [3, 2, 2, 2, 4]);
1136+
let expected = Series::new("aux".into(), [3, 2, 2, 2, 4]).into();
11321137

11331138
if ans.eq(&expected) {
11341139
Ok(())

0 commit comments

Comments
 (0)