[Performance] Daft and Daft+Ray significantly slower than Pandas for ClickHouse query and data processing #5208
-
|
I am currently evaluating Daft for our data processing pipelines, which currently use Pandas. I'm observing some unexpected performance results when comparing Pandas, Daft, and Daft+Ray on a simple task. For the same ClickHouse database, the same SELECT query, and identical data processing logic, the performance metrics are as follows: Pandas: ~0.18 seconds
Here is the main code used for benchmarking the three approaches. @time_execution
def pandas_clickhouse_test():
from clickhouse_driver import Client
client = Client(**CLICKHOUSE_CONFIG)
df = client.query_dataframe(CLICKHOUSE_QUERY)
total_reject_per_day = df.groupby('g')['a'].sum().reset_index()
total_reject_per_day.columns = ['g', 'b']
df_merged = df.merge(total_reject_per_day, on='g')
df_merged['c'] = df_merged['a'] / df_merged['b']
df_merged['d'] = df_merged['a'] / df_merged['e']
f_df = df_merged[df_merged['g'] == "da"][['f', 'g', 'a', 'c', 'd']].copy()
f_df.rename(columns={'g': 'date'}, inplace=True)
f_trend_df = df_merged[['f', 'g', 'd']].copy()
f_trend_df.rename(columns={'g': 'date'}, inplace=True)
print("f_df head:")
print(f_df.head())
print("f_trend_df head:")
print(f_trend_df.head())
client.disconnect()
return df
@time_execution
def daft_clickhouse_test():
result = daft_ck_query()
return result
@time_execution
def daft_ray_clickhouse_test():
ray.init(
address = "ray://xxx:10001"
)
result = daft_ck_query()
ray.shutdown()
return result
def daft_ck_query():
df = daft.read_sql(
CLICKHOUSE_QUERY,
create_clickhouse_connection
)
total_reject_per_day = (
df
.groupby("g")
.agg([daft.col("a").sum().alias("b")])
.collect()
)
df_merged = df.join(total_reject_per_day, on="g", how="left")
df_merged = df_merged.with_columns({
"c": daft.col("a") / daft.col("b"),
"d": daft.col("a") / daft.col("e"),
})
f_df = (
df_merged
.where(daft.col("g") == "da")
.select(
daft.col("f"),
daft.col("g").alias("date"),
daft.col("a"),
daft.col("c"),
daft.col("d")
)
)
f_trend_df = (
df_merged
.select(
daft.col("f"),
daft.col("g").alias("date"),
daft.col("d"),
)
)
print("f_df head:")
print(f_df.limit(5).collect())
print("f_trend_df head:")
print(f_trend_df.limit(5).collect())
result = df.collect()
return result |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
|
@tingjun-cs In your example, the reason clickhouse+pandas is much faster is that you are using the native clickhouse driver. from clickhouse_driver import Client
client = Client(**CLICKHOUSE_CONFIG)
df = client.query_dataframe(CLICKHOUSE_QUERY)This is optimized for getting data from clickhouse into pandas specifically. More specifically, both clickhouse & pandas are column major, so there is much less overhead when converting. the official clickhouse_connect driver is arrow compatible, and I believe itll use that when converting to dataframes. I'm not sure what happens internally with the unofficial driver you provided in the code. The If your data is small enough to fit in memory, I'd recommend using arrow as this'll be the fastest and most efficient import clickhouse_connect
client = clickhouse_connect.get_client(host='localhost', username='default', password='password')
arrow_data = client.query_arrow(query)
df = daft.from_arrow(arrow_data)If it can't fit in memory, You're best bet would be to create a ClickhouseSource that implements |
Beta Was this translation helpful? Give feedback.
@tingjun-cs In your example, the reason clickhouse+pandas is much faster is that you are using the native clickhouse driver.
This is optimized for getting data from clickhouse into pandas specifically. More specifically, both clickhouse & pandas are column major, so there is much less overhead when converting. the official clickhouse_connect driver is arrow compatible, and I believe itll use that when converting to dataframes. I'm not sure what happens internally with the unofficial driver you provided in the code.
The
daft.read_sqlis general purpose and is …