Skip to content

Commit 11e1e61

Browse files
Merge pull request #463 from infehm/add_predicates_to_conditions_converter
contribution / Add predicates to conditions converter
2 parents 94f560e + fcc4bf4 commit 11e1e61

File tree

3 files changed

+166
-1
lines changed

3 files changed

+166
-1
lines changed

CHANGES.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ Changelog
33
=========
44

55

6-
Version 5.0.1 (2021-06-xx)
6+
Version 5.1.0 (2021-06-xx)
77
==========================
88
* Add :meth:`~kartothek.io.eager.copy_dataset` to copy and optionally rename datasets within one store or between stores (eager only)
99
* Add renaming option to :meth:`~kartothek.io.eager_cube.copy_cube`
10+
* Add predicates to cube condition converter to :meth:`~kartothek.utils.predicate_converter`
1011

1112

1213
Version 5.0.0 (2021-06-23)
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""
2+
Helper module to convert kartothek dataset load predicates into cube conditions.
3+
"""
4+
5+
from typing import Any, List, Sequence, Tuple
6+
7+
import pandas as pd
8+
import pyarrow as pa
9+
10+
from kartothek.core.cube.conditions import Condition
11+
12+
13+
def write_predicate_as_cube_condition(predicate: Tuple[str, str, Any]) -> Condition:
14+
"""
15+
Rewrites a single io.dask.dataset 'read_dataset_as_ddf' predicate condition as cube condition.
16+
17+
Remark: This function is restricted by "Condition.from_string" which does not allow for IsInCondition
18+
and InIntervalCondition and will throw an error if conditions of those types are passed.
19+
20+
Parameters
21+
----------
22+
predicate: list
23+
list containing single predicate definition
24+
25+
Returns
26+
-------
27+
condition: Condition
28+
cube condition containing the predicate definition
29+
"""
30+
condition_string = None
31+
parameter_format_dict = {}
32+
33+
if len(predicate) != 3:
34+
raise ValueError("Please use predicates consisting of exactly 3 entries")
35+
36+
if type(predicate[2]) == int:
37+
condition_string = f"{predicate[0]} {predicate[1]} {str(predicate[2])}"
38+
parameter_format_dict[predicate[0]] = pa.int64()
39+
if type(predicate[2]) == str:
40+
condition_string = f"{predicate[0]} {predicate[1]} {predicate[2]}"
41+
parameter_format_dict[predicate[0]] = pa.string()
42+
if type(predicate[2]) == pd._libs.tslibs.timestamps.Timestamp:
43+
condition_string = (
44+
f"{predicate[0]} {predicate[1]} {predicate[2].strftime('%Y-%m-%d')}"
45+
)
46+
parameter_format_dict[predicate[0]] = pa.timestamp("s")
47+
if type(predicate[2]) == bool:
48+
condition_string = f"{predicate[0]} {predicate[1]} {str(predicate[2])}"
49+
parameter_format_dict[predicate[0]] = pa.bool_()
50+
if type(predicate[2]) == float:
51+
condition_string = f"{predicate[0]} {predicate[1]} {str(predicate[2])}"
52+
parameter_format_dict[predicate[0]] = pa.float64()
53+
54+
if condition_string is not None:
55+
condition = Condition.from_string(condition_string, parameter_format_dict)
56+
else:
57+
raise TypeError(
58+
"Please only enter predicates for parameter values of the following type:"
59+
" str, int, float, bool or pandas timestamp, "
60+
)
61+
return condition
62+
63+
64+
def convert_predicates_to_cube_conditions(
65+
predicates: List[List[Tuple[str, str, Any]]],
66+
) -> Sequence[Condition]:
67+
"""
68+
Converts a io.dask.dataset 'read_dataset_as_ddf' predicate to a cube condition
69+
70+
Parameters
71+
----------
72+
predicates: list
73+
list containing a list of single predicates
74+
75+
Returns
76+
-------
77+
condition: Condition
78+
cube condition containing the combined predicate definitions
79+
"""
80+
condition: Any = ()
81+
if len(predicates) > 1:
82+
raise ValueError(
83+
"Cube conditions cannot handle 'or' operators, therefore, "
84+
"please pass a predicate list with one element."
85+
)
86+
for predicate in predicates[0]:
87+
condition = condition + (write_predicate_as_cube_condition(predicate),)
88+
return condition
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import pandas as pd
2+
import pytest
3+
4+
from kartothek.core.cube.conditions import C
5+
from kartothek.utils.predicate_converter import (
6+
convert_predicates_to_cube_conditions,
7+
write_predicate_as_cube_condition,
8+
)
9+
10+
11+
@pytest.mark.parametrize(
12+
"test_input, expected",
13+
[
14+
(["column", "==", 1], (C("column") == 1)),
15+
(["column", "==", 1.1], C("column") == 1.1),
16+
(["column", "==", "1"], C("column") == "1"),
17+
(
18+
["date", "==", pd.to_datetime("2020-01-01")],
19+
(C("date") == pd.Timestamp("2020-01-01 00:00:00")),
20+
),
21+
(["column", "==", True], C("column") == True), # noqa: E712)
22+
(["column", "<=", 1], (C("column") <= 1)),
23+
(["column", ">=", 1], (C("column") >= 1)),
24+
(["column", "!=", 1], (C("column") != 1)),
25+
],
26+
)
27+
def test_write_predicate_as_cube_condition(test_input, expected):
28+
assert write_predicate_as_cube_condition(test_input) == expected
29+
30+
31+
def test_raises_type_error_write_predicate_as_cube_condition():
32+
with pytest.raises(
33+
TypeError,
34+
match="Please only enter predicates for parameter values of the "
35+
"following type: str, int, float, bool or pandas timestamp, ",
36+
):
37+
write_predicate_as_cube_condition(
38+
("date", "==", pd.to_datetime("2020-01-01").date())
39+
)
40+
41+
42+
def test_raises_value_error_write_predicate_as_cube_condition():
43+
with pytest.raises(
44+
ValueError, match="Please use predicates consisting of exactly 3 entries"
45+
):
46+
write_predicate_as_cube_condition(("date", "=="))
47+
48+
with pytest.raises(
49+
ValueError, match="Please use predicates consisting of exactly 3 entries"
50+
):
51+
write_predicate_as_cube_condition(("date", "==", "date", "=="))
52+
53+
54+
@pytest.mark.parametrize(
55+
"test_input, expected",
56+
[
57+
([[("column", "==", 1)]], (C("column") == 1,)),
58+
(
59+
[[("column", "==", 1), ("column2", "==", "1")]],
60+
(C("column") == 1, C("column2") == "1"),
61+
),
62+
],
63+
)
64+
def test_convert_predicates_to_cube_conditions(test_input, expected):
65+
assert convert_predicates_to_cube_conditions(test_input) == expected
66+
67+
68+
def test_raises_value_error_convert_predicates_to_cube_conditions():
69+
with pytest.raises(
70+
ValueError,
71+
match="Cube conditions cannot handle 'or' operators, therefore, "
72+
"please pass a predicate list with one element.",
73+
):
74+
convert_predicates_to_cube_conditions(
75+
[[("column", "==", 1)], [("column2", "==", 2)]]
76+
)

0 commit comments

Comments
 (0)