Skip to content

Commit 02e2dad

Browse files
authored
Increase speed and parallelism of the limit algorithm and implement descending sorting (#75)
* Increase speed and parallelism of the limit algorithm * Fixed docs * Implement descending sorting. Fixes #10 * Replace two delayed usages. Thanks @mrocklin * Remoe the reference to the function - to make it usable without dask-sql installation on the workers
1 parent 8abc48c commit 02e2dad

File tree

3 files changed

+107
-68
lines changed

3 files changed

+107
-68
lines changed

dask_sql/physical/rel/logical/sort.py

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
from typing import Dict, List
1+
from typing import List
22

3+
import dask
34
import dask.dataframe as dd
4-
from dask.highlevelgraph import HighLevelGraph
5-
from dask.dataframe.core import new_dd_object
65
import pandas as pd
76
import dask.array as da
87

@@ -62,11 +61,6 @@ def _apply_sort(
6261
first_sort_column = sort_columns[0]
6362
first_sort_ascending = sort_ascending[0]
6463

65-
# Sort the first column with set_index. Currently, we can only handle ascending sort
66-
if not first_sort_ascending:
67-
raise NotImplementedError(
68-
"The first column needs to be sorted ascending (yet)"
69-
)
7064
# We can only sort if there are no NaNs or infs.
7165
# Therefore we need to do a single pass over the dataframe
7266
# to warn the user
@@ -79,6 +73,11 @@ def _apply_sort(
7973
raise ValueError("Can not sort a column with NaNs")
8074

8175
df = df.set_index(first_sort_column, drop=False).reset_index(drop=True)
76+
if not first_sort_ascending:
77+
# As set_index().reset_index() always sorts ascending, we need to reverse
78+
# the order inside all partitions and the order of the partitions itself
79+
df = df.map_partitions(lambda partition: partition[::-1], meta=df)
80+
df = df.partitions[::-1]
8281

8382
# sort the remaining columns if given
8483
if len(sort_columns) > 1:
@@ -94,8 +93,7 @@ def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame
9493
Limit the dataframe to the window [offset, end].
9594
That is unfortunately, not so simple as we do not know how many
9695
items we have in each partition. We have therefore no other way than to
97-
calculate (!!!) the sizes of each partition
98-
(this means we need to compute the dataframe already here).
96+
calculate (!!!) the sizes of each partition.
9997
10098
After that, we can create a new dataframe from the old
10199
dataframe by calculating for each partition if and how much
@@ -104,24 +102,31 @@ def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame
104102
we need to pass the partition number to the selection
105103
function, which is not possible with normal "map_partitions".
106104
"""
107-
# As we need to calculate the partition size, we better persist
108-
# the df. I think...
109-
# TODO: check if this is the best thing to do
110-
df = df.persist()
105+
if not offset:
106+
# We do a (hopefully) very quick check: if the first partition
107+
# is already enough, we will just ust this
108+
first_partition_length = len(df.partitions[0])
109+
if first_partition_length >= end:
110+
return df.head(end, compute=False)
111111

112112
# First, we need to find out which partitions we want to use.
113113
# Therefore we count the total number of entries
114-
partition_borders = df.map_partitions(lambda x: len(x)).compute()
115-
partition_borders = partition_borders.cumsum().to_dict()
114+
partition_borders = df.map_partitions(lambda x: len(x))
116115

117116
# Now we let each of the partitions figure out, how much it needs to return
118117
# using these partition borders
119-
# For this, we generate out own dask computation graph (as it does not really)
120-
# fit well with one of the already present methods
118+
# For this, we generate out own dask computation graph (as it does not really
119+
# fit well with one of the already present methods).
121120

122121
# (a) we define a method to be calculated on each partition
123122
# This method returns the part of the partition, which falls between [offset, fetch]
124-
def select_from_to(df, partition_index):
123+
# Please note that the dask object "partition_borders", will be turned into
124+
# its pandas representation at this point and we can calculate the cumsum
125+
# (which is not possible on the dask object). Recalculating it should not cost
126+
# us much, as we assume the number of partitions is rather small.
127+
@dask.delayed
128+
def select_from_to(df, partition_index, partition_borders):
129+
partition_borders = partition_borders.cumsum().to_dict()
125130
this_partition_border_left = (
126131
partition_borders[partition_index - 1] if partition_index > 0 else 0
127132
)
@@ -141,20 +146,11 @@ def select_from_to(df, partition_index):
141146

142147
return df.iloc[from_index:to_index]
143148

144-
# Then we (b) define a task graph. It should calculate the function above on each of the partitions of
145-
# df (specified by (df._name, i) for each partition i). As an argument, we pass the partition_index.
146-
dask_graph_name = df._name + "-limit"
147-
dask_graph_dict = {}
148-
149-
for partition_index in range(df.npartitions):
150-
dask_graph_dict[(dask_graph_name, partition_index)] = (
151-
select_from_to,
152-
(df._name, partition_index),
153-
partition_index,
154-
)
155-
156-
# We replace df with our new graph
157-
graph = HighLevelGraph.from_collections(
158-
dask_graph_name, dask_graph_dict, dependencies=[df]
149+
# (b) Now we just need to apply the function on every partition
150+
# We do this via the delayed interface, which seems the easiest one.
151+
return dd.from_delayed(
152+
[
153+
select_from_to(partition, partition_number, partition_borders)
154+
for partition_number, partition in enumerate(df.partitions)
155+
]
159156
)
160-
return new_dd_object(graph, dask_graph_name, df._meta, df.divisions)

docs/pages/sql.rst

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,15 +301,15 @@ Limitatons
301301
Whenever you find a not already implemented operation, keyword
302302
or functionality, please raise an issue at our `issue tracker <https://github.com/nils-braun/dask-sql/issues>`_ with your use-case.
303303

304-
Apart from those functional limitations, there are also two operations which need special care: ``ORDER BY`` and ``LIMIT``.
304+
Apart from those functional limitations, there is a operation which need special care: ``ORDER BY```.
305305
Normally, ``dask-sql`` calls create a ``dask`` data frame, which gets only computed when you call the ``.compute()`` member.
306-
Due to internal constraints, this is currently not the case for ``ORDER BY`` and ``LIMIT``.
307-
Including one of those operations will trigger a calculation of the full data frame already when calling ``Context.sql()``.
306+
Due to internal constraints, this is currently not the case for ``ORDER BY``.
307+
Including this operation will trigger a calculation of the full data frame already when calling ``Context.sql()``.
308308

309309
.. warning::
310310

311311
There is a subtle but important difference between adding ``LIMIT 10`` to your SQL query and calling ``sql(...).head(10)``.
312312
The data inside ``dask`` is partitioned, to distribute it over the cluster.
313313
``head`` will only return the first N elements from the first partition - even if N is larger than the partition size.
314314
As a benefit, calling ``.head(N)`` is typically faster than calculating the full data sample with ``.compute()``.
315-
``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered - but will also need to compute the full data set for this.
315+
``LIMIT`` on the other hand will always return the first N elements - no matter on how many partitions they are scattered - but will also need to precalculate the first partition to find out, if it needs to have a look into all data or not.

tests/integration/test_sort.py

Lines changed: 72 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,38 +7,85 @@
77
import dask.dataframe as dd
88

99

10-
def test_sort(c, user_table_1):
11-
df = c.sql(
10+
def test_sort(c, user_table_1, df):
11+
df_result = c.sql(
1212
"""
1313
SELECT
1414
*
1515
FROM user_table_1
1616
ORDER BY b, user_id DESC
1717
"""
1818
)
19-
df = df.compute().reset_index(drop=True)
19+
df_result = df_result.compute().reset_index(drop=True)
2020
df_expected = user_table_1.sort_values(
2121
["b", "user_id"], ascending=[True, False]
2222
).reset_index(drop=True)
2323

24-
assert_frame_equal(df, df_expected)
24+
assert_frame_equal(df_result, df_expected)
25+
26+
df_result = c.sql(
27+
"""
28+
SELECT
29+
*
30+
FROM df
31+
ORDER BY b DESC, a DESC
32+
"""
33+
)
34+
df_result = df_result.compute()
35+
df_expected = df.sort_values(["b", "a"], ascending=[False, False])
36+
37+
assert_frame_equal(
38+
df_result.reset_index(drop=True), df_expected.reset_index(drop=True)
39+
)
40+
41+
df_result = c.sql(
42+
"""
43+
SELECT
44+
*
45+
FROM df
46+
ORDER BY a DESC, b
47+
"""
48+
)
49+
df_result = df_result.compute()
50+
df_expected = df.sort_values(["a", "b"], ascending=[False, True])
51+
52+
assert_frame_equal(
53+
df_result.reset_index(drop=True), df_expected.reset_index(drop=True)
54+
)
55+
56+
df_result = c.sql(
57+
"""
58+
SELECT
59+
*
60+
FROM df
61+
ORDER BY b, a
62+
"""
63+
)
64+
df_result = df_result.compute()
65+
df_expected = df.sort_values(["b", "a"], ascending=[True, True])
66+
67+
assert_frame_equal(
68+
df_result.reset_index(drop=True), df_expected.reset_index(drop=True)
69+
)
2570

2671

2772
def test_sort_by_alias(c, user_table_1):
28-
df = c.sql(
73+
df_result = c.sql(
2974
"""
3075
SELECT
3176
b AS my_column
3277
FROM user_table_1
3378
ORDER BY my_column, user_id DESC
3479
"""
3580
)
36-
df = df.compute().reset_index(drop=True).rename(columns={"my_column": "b"})
81+
df_result = (
82+
df_result.compute().reset_index(drop=True).rename(columns={"my_column": "b"})
83+
)
3784
df_expected = user_table_1.sort_values(
3885
["b", "user_id"], ascending=[True, False]
3986
).reset_index(drop=True)[["b"]]
4087

41-
assert_frame_equal(df, df_expected)
88+
assert_frame_equal(df_result, df_expected)
4289

4390

4491
def test_sort_with_nan(c):
@@ -67,52 +114,48 @@ def test_sort_strings(c):
67114
string_table = pd.DataFrame({"a": ["zzhsd", "öfjdf", "baba"]})
68115
c.create_table("string_table", string_table)
69116

70-
df = c.sql(
117+
df_result = c.sql(
71118
"""
72119
SELECT
73120
*
74121
FROM string_table
75122
ORDER BY a
76123
"""
77124
)
78-
df = df.compute().reset_index(drop=True)
125+
df_result = df_result.compute().reset_index(drop=True)
79126
df_expected = string_table.sort_values(["a"], ascending=True).reset_index(drop=True)
80127

81-
assert_frame_equal(df, df_expected)
128+
assert_frame_equal(df_result, df_expected)
82129

83130

84131
def test_sort_not_allowed(c):
85-
# No DESC implemented for the first column
86-
with pytest.raises(NotImplementedError):
87-
c.sql("SELECT * FROM user_table_1 ORDER BY b DESC")
88-
89132
# Wrong column
90133
with pytest.raises(Exception):
91134
c.sql("SELECT * FROM user_table_1 ORDER BY 42")
92135

93136

94137
def test_limit(c, long_table):
95-
df = c.sql("SELECT * FROM long_table LIMIT 101")
96-
df = df.compute()
138+
df_result = c.sql("SELECT * FROM long_table LIMIT 101")
139+
df_result = df_result.compute()
97140

98-
assert_frame_equal(df, long_table.iloc[:101])
141+
assert_frame_equal(df_result, long_table.iloc[:101])
99142

100-
df = c.sql("SELECT * FROM long_table LIMIT 100")
101-
df = df.compute()
143+
df_result = c.sql("SELECT * FROM long_table LIMIT 100")
144+
df_result = df_result.compute()
102145

103-
assert_frame_equal(df, long_table.iloc[:100])
146+
assert_frame_equal(df_result, long_table.iloc[:100])
104147

105-
df = c.sql("SELECT * FROM long_table LIMIT 100 OFFSET 99")
106-
df = df.compute()
148+
df_result = c.sql("SELECT * FROM long_table LIMIT 100 OFFSET 99")
149+
df_result = df_result.compute()
107150

108-
assert_frame_equal(df, long_table.iloc[99 : 99 + 100])
151+
assert_frame_equal(df_result, long_table.iloc[99 : 99 + 100])
109152

110-
df = c.sql("SELECT * FROM long_table LIMIT 100 OFFSET 100")
111-
df = df.compute()
153+
df_result = c.sql("SELECT * FROM long_table LIMIT 100 OFFSET 100")
154+
df_result = df_result.compute()
112155

113-
assert_frame_equal(df, long_table.iloc[100 : 100 + 100])
156+
assert_frame_equal(df_result, long_table.iloc[100 : 100 + 100])
114157

115-
df = c.sql("SELECT * FROM long_table LIMIT 101 OFFSET 101")
116-
df = df.compute()
158+
df_result = c.sql("SELECT * FROM long_table LIMIT 101 OFFSET 101")
159+
df_result = df_result.compute()
117160

118-
assert_frame_equal(df, long_table.iloc[101 : 101 + 101])
161+
assert_frame_equal(df_result, long_table.iloc[101 : 101 + 101])

0 commit comments

Comments
 (0)