-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Datafusion solution [updated] #240
Open
matthewmturner
wants to merge
21
commits into
h2oai:master
Choose a base branch
from
matthewmturner:datafusion
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
3a983fd
Datafusion solution
Dandandan b1f613e
Datafusion solution
Dandandan 51ce127
Query fix
Dandandan 3343428
Undo change
Dandandan d1e7ff3
Increase batch size
Dandandan 58be012
Rename to ans
Dandandan d87c92d
Fix
Dandandan 2b67e2a
Add q7/q10
Dandandan d217e37
Use multiple threads better
Dandandan 5a3e5ec
Add exec script
Dandandan f839050
Some cleanup
Dandandan 6cb14f5
Rename
Dandandan 63fe38b
Fix disabled snmalloc
Dandandan cbecfbc
Use arrow master again
Dandandan 88ba391
Update benchmark code
Dandandan fbb50dc
Make queries work again
matthewmturner 20978b7
Add join queries
matthewmturner 4042b3c
group by q8
matthewmturner 2cca309
Add python bindings
matthewmturner c345446
Fix join and utils
matthewmturner 82f34fb
Remove rust impl and update utilities
matthewmturner File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
#V2 | ||
\\h | ||
\\h Create table as | ||
\\h create table | ||
CREATE EXTERNAL TABLE x STORED AS CSV LOCATION "data/J1_1e7_NA_0_0.csv"; | ||
SELECT * FROM x LIMIT 5; | ||
\\q | ||
CREATE EXTERNAL TABLE x STORED AS CSV WITH HEADER ROW LOCATION "data/J1_1e7_NA_0_0.csv"\n; | ||
select * from x limit 5; | ||
show x\n; | ||
show columns from x; | ||
\\q | ||
select 2^2; | ||
select 2 * 2; | ||
\\q | ||
select 2^2; | ||
select power(2,2); | ||
\\q |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
#!/usr/bin/env python | ||
|
||
print("# groupby-datafusion.py", flush=True) | ||
|
||
import os | ||
import gc | ||
import timeit | ||
import datafusion as df | ||
from datafusion import functions as f | ||
from datafusion import col | ||
import pyarrow as pa | ||
from pyarrow import csv as pacsv | ||
|
||
exec(open("./_helpers/helpers.py").read()) | ||
|
||
def ans_shape(batches): | ||
rows, cols = 0, 0 | ||
for batch in batches: | ||
rows += batch.num_rows | ||
if cols == 0: | ||
cols = batch.num_columns | ||
else: | ||
assert(cols == batch.num_columns) | ||
|
||
return rows, cols | ||
|
||
# ver = df.__version__ | ||
ver = "6.0.0" | ||
git = "" | ||
task = "groupby" | ||
solution = "datafusion" | ||
fun = ".groupby" | ||
cache = "TRUE" | ||
on_disk = "FALSE" | ||
|
||
data_name = os.environ["SRC_DATANAME"] | ||
src_grp = os.path.join("data", data_name + ".csv") | ||
print("loading dataset %s" % data_name, flush=True) | ||
|
||
data = pacsv.read_csv(src_grp) | ||
|
||
ctx = df.ExecutionContext() | ||
ctx.register_record_batches("x", [data.to_batches()]) | ||
|
||
in_rows = data.num_rows | ||
|
||
task_init = timeit.default_timer() | ||
|
||
question = "sum v1 by id1" # q1 | ||
gc.collect() | ||
t_start = timeit.default_timer() | ||
ans = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1").collect() | ||
t = timeit.default_timer() - t_start | ||
print(t) | ||
shape = ans_shape(ans) | ||
print(shape) | ||
t_start = timeit.default_timer() | ||
df = ctx.create_dataframe([ans]) | ||
chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] | ||
chkt = timeit.default_timer() - t_start | ||
m = memory_usage() | ||
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) | ||
del ans | ||
gc.collect() | ||
|
||
question = "sum v1 by id1:id2" # q2 | ||
gc.collect() | ||
t_start = timeit.default_timer() | ||
ans = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2").collect() | ||
t = timeit.default_timer() - t_start | ||
print(t) | ||
shape = ans_shape(ans) | ||
print(shape) | ||
t_start = timeit.default_timer() | ||
df = ctx.create_dataframe([ans]) | ||
chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] | ||
chkt = timeit.default_timer() - t_start | ||
m = memory_usage() | ||
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) | ||
del ans | ||
gc.collect() | ||
|
||
question = "sum v1 mean v3 by id3" # q3 | ||
gc.collect() | ||
t_start = timeit.default_timer() | ||
ans = ctx.sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3").collect() | ||
t = timeit.default_timer() - t_start | ||
print(t) | ||
shape = ans_shape(ans) | ||
print(shape) | ||
t_start = timeit.default_timer() | ||
df = ctx.create_dataframe([ans]) | ||
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] | ||
chkt = timeit.default_timer() - t_start | ||
m = memory_usage() | ||
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) | ||
del ans | ||
gc.collect() | ||
|
||
question = "mean v1:v3 by id4" # q4 | ||
gc.collect() | ||
t_start = timeit.default_timer() | ||
ans = ctx.sql("SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4").collect() | ||
t = timeit.default_timer() - t_start | ||
print(t) | ||
shape = ans_shape(ans) | ||
print(shape) | ||
t_start = timeit.default_timer() | ||
df = ctx.create_dataframe([ans]) | ||
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] | ||
chkt = timeit.default_timer() - t_start | ||
m = memory_usage() | ||
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) | ||
del ans | ||
gc.collect() | ||
|
||
question = "sum v1:v3 by id6" # q5 | ||
gc.collect() | ||
t_start = timeit.default_timer() | ||
ans = ctx.sql("SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6").collect() | ||
t = timeit.default_timer() - t_start | ||
print(t) | ||
shape = ans_shape(ans) | ||
print(shape) | ||
t_start = timeit.default_timer() | ||
df = ctx.create_dataframe([ans]) | ||
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] | ||
chkt = timeit.default_timer() - t_start | ||
m = memory_usage() | ||
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) | ||
del ans | ||
gc.collect() | ||
|
||
question = "max v1 - min v2 by id3" # q7 | ||
gc.collect() | ||
t_start = timeit.default_timer() | ||
ans = ctx.sql("SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3").collect() | ||
t = timeit.default_timer() - t_start | ||
print(t) | ||
shape = ans_shape(ans) | ||
print(shape) | ||
t_start = timeit.default_timer() | ||
df = ctx.create_dataframe([ans]) | ||
chk = df.aggregate([], [f.sum(col("range_v1_v2"))]).collect()[0].column(0)[0] | ||
chkt = timeit.default_timer() - t_start | ||
m = memory_usage() | ||
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) | ||
del ans | ||
gc.collect() | ||
|
||
question = "largest two v3 by id6" # q8 | ||
gc.collect() | ||
t_start = timeit.default_timer() | ||
ans = ctx.sql("SELECT id6, v3 from (SELECT id6, v3, row_number() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS row FROM x) t WHERE row <= 2").collect() | ||
t = timeit.default_timer() - t_start | ||
print(t) | ||
shape = ans_shape(ans) | ||
print(shape) | ||
t_start = timeit.default_timer() | ||
df = ctx.create_dataframe([ans]) | ||
chk = df.aggregate([], [f.sum(col("v3"))]).collect()[0].column(0)[0] | ||
chkt = timeit.default_timer() - t_start | ||
m = memory_usage() | ||
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) | ||
del ans | ||
gc.collect() | ||
|
||
question = "sum v3 count by id1:id6" # q10 | ||
gc.collect() | ||
t_start = timeit.default_timer() | ||
ans = ctx.sql("SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6").collect() | ||
t = timeit.default_timer() - t_start | ||
print(t) | ||
shape = ans_shape(ans) | ||
print(shape) | ||
t_start = timeit.default_timer() | ||
df = ctx.create_dataframe([ans]) | ||
chk = df.aggregate([], [f.sum(col("v3")), f.sum(col("cnt"))]).collect()[0].to_pandas().to_numpy()[0] | ||
chkt = timeit.default_timer() - t_start | ||
m = memory_usage() | ||
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) | ||
del ans | ||
gc.collect() | ||
|
||
print("grouping finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True) | ||
|
||
exit(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For every solution in this benchmark checking shape is a part of timing, to ensure no laziness happens. I can imagine data fusion is not lazy, yet it seems to be unfair to skip this step in the timing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I'll update!