Skip to content

Commit b733f2e

Browse files
feat: Add experimental polars execution
1 parent 3ea6043 commit b733f2e

File tree

7 files changed

+139
-8
lines changed

7 files changed

+139
-8
lines changed

bigframes/_config/bigquery_options.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def __init__(
9090
allow_large_results: bool = False,
9191
ordering_mode: Literal["strict", "partial"] = "strict",
9292
client_endpoints_override: Optional[dict] = None,
93+
enable_polars_execution: bool = False,
9394
):
9495
self._credentials = credentials
9596
self._project = project
@@ -108,6 +109,7 @@ def __init__(
108109
client_endpoints_override = {}
109110

110111
self._client_endpoints_override = client_endpoints_override
112+
self._enable_polars_execution = enable_polars_execution
111113

112114
@property
113115
def application_name(self) -> Optional[str]:
@@ -379,3 +381,17 @@ def client_endpoints_override(self, value: dict):
379381
)
380382

381383
self._client_endpoints_override = value
384+
385+
@property
386+
def enable_polars_execution(self) -> bool:
387+
"""If True, will use polars to execute some simple query plans locally."""
388+
return self._enable_polars_execution
389+
390+
@enable_polars_execution.setter
391+
def enable_polars_execution(self, value: bool):
392+
if value is True:
393+
msg = bfe.format_message(
394+
"Polars execution is an experimental feature, and may not be stable. Must have polars installed."
395+
)
396+
warnings.warn(msg, category=bfe.PreviewWarning)
397+
self._enable_polars_execution = value

bigframes/core/compile/polars/compiler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,15 +184,15 @@ class PolarsCompiler:
184184
expr_compiler = PolarsExpressionCompiler()
185185
agg_compiler = PolarsAggregateCompiler()
186186

187-
def compile(self, array_value: bigframes.core.ArrayValue) -> pl.LazyFrame:
187+
def compile(self, plan: nodes.BigFrameNode) -> pl.LazyFrame:
188188
if not polars_installed:
189189
raise ValueError(
190190
"Polars is not installed, cannot compile to polars engine."
191191
)
192192

193193
# TODO: Create standard way to configure BFET -> BFET rewrites
194194
# Polars has incomplete slice support in lazy mode
195-
node = nodes.bottom_up(array_value.node, bigframes.core.rewrite.rewrite_slice)
195+
node = nodes.bottom_up(plan, bigframes.core.rewrite.rewrite_slice)
196196
return self.compile_node(node)
197197

198198
@functools.singledispatchmethod

bigframes/session/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ def __init__(
254254
storage_manager=self._temp_storage_manager,
255255
strictly_ordered=self._strictly_ordered,
256256
metrics=self._metrics,
257+
enable_polars_execution=context.enable_polars_execution,
257258
)
258259
self._loader = bigframes.session.loader.GbqDataLoader(
259260
session=self,

bigframes/session/bq_caching_executor.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@
3838
import bigframes.dtypes
3939
import bigframes.exceptions as bfe
4040
import bigframes.features
41-
from bigframes.session import executor, local_scan_executor, read_api_execution
41+
from bigframes.session import (
42+
executor,
43+
local_scan_executor,
44+
read_api_execution,
45+
semi_executor,
46+
)
4247
import bigframes.session._io.bigquery as bq_io
4348
import bigframes.session.metrics
4449
import bigframes.session.planner
@@ -123,21 +128,29 @@ def __init__(
123128
*,
124129
strictly_ordered: bool = True,
125130
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
131+
enable_polars_execution: bool = False,
126132
):
127133
self.bqclient = bqclient
128134
self.storage_manager = storage_manager
129135
self.strictly_ordered: bool = strictly_ordered
130136
self.cache: ExecutionCache = ExecutionCache()
131137
self.metrics = metrics
132138
self.bqstoragereadclient = bqstoragereadclient
133-
# Simple left-to-right precedence for now
134-
self._semi_executors = (
139+
self._enable_polars_execution = enable_polars_execution
140+
self._semi_executors: Sequence[semi_executor.SemiExecutor] = (
135141
read_api_execution.ReadApiSemiExecutor(
136142
bqstoragereadclient=bqstoragereadclient,
137143
project=self.bqclient.project,
138144
),
139145
local_scan_executor.LocalScanExecutor(),
140146
)
147+
if enable_polars_execution:
148+
from bigframes.session import polars_executor
149+
150+
self._semi_executors = (
151+
*self._semi_executors,
152+
polars_executor.PolarsExecutor(),
153+
)
141154

142155
def to_sql(
143156
self,
@@ -542,8 +555,8 @@ def _execute_plan(
542555
"""Just execute whatever plan as is, without further caching or decomposition."""
543556
# First try to execute fast-paths
544557
if not output_spec.require_bq_table:
545-
for semi_executor in self._semi_executors:
546-
maybe_result = semi_executor.execute(plan, ordered=ordered, peek=peek)
558+
for exec in self._semi_executors:
559+
maybe_result = exec.execute(plan, ordered=ordered, peek=peek)
547560
if maybe_result:
548561
return maybe_result
549562

bigframes/session/polars_executor.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
from typing import Optional, TYPE_CHECKING
17+
18+
from bigframes.core import bigframe_node, nodes
19+
from bigframes.session import executor, semi_executor
20+
21+
if TYPE_CHECKING:
22+
import polars as pl
23+
24+
25+
_COMPATIBLE_NODES = (
26+
nodes.ReadLocalNode,
27+
nodes.OrderByNode,
28+
nodes.ReversedNode,
29+
nodes.SelectionNode,
30+
)
31+
32+
33+
class PolarsExecutor(semi_executor.SemiExecutor):
34+
def __init__(self):
35+
# This will error out if polars is not installed
36+
from bigframes.core.compile.polars import PolarsCompiler
37+
38+
self._compiler = PolarsCompiler()
39+
40+
def execute(
41+
self,
42+
plan: bigframe_node.BigFrameNode,
43+
ordered: bool,
44+
peek: Optional[int] = None,
45+
) -> Optional[executor.ExecuteResult]:
46+
if not self._can_execute(plan):
47+
return None
48+
# Note: Ignoring ordered flag, as just executing totally ordered is fine.
49+
lazy_frame: pl.LazyFrame = self._compiler.compile(plan)
50+
if peek is not None:
51+
lazy_frame = lazy_frame.limit(peek)
52+
pa_table = lazy_frame.collect().to_arrow()
53+
return executor.ExecuteResult(
54+
arrow_batches=iter(pa_table.to_batches()),
55+
schema=plan.schema,
56+
total_bytes=pa_table.nbytes,
57+
total_rows=pa_table.num_rows,
58+
)
59+
60+
def _can_execute(self, plan: bigframe_node.BigFrameNode):
61+
return all(isinstance(node, _COMPATIBLE_NODES) for node in plan.unique_nodes())
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import pytest
15+
16+
import bigframes
17+
from tests.system.utils import assert_pandas_df_equal
18+
19+
polars = pytest.importorskip("polars", reason="polars is required for this test")
20+
21+
22+
@pytest.fixture(scope="module")
23+
def session_w_polars():
24+
context = bigframes.BigQueryOptions(location="US", enable_polars_execution=True)
25+
session = bigframes.Session(context=context)
26+
yield session
27+
session.close() # close generated session at cleanup time
28+
29+
30+
def test_df_reorder_execute(session_w_polars, scalars_pandas_df_index):
31+
execution_count_before = session_w_polars._metrics.execution_count
32+
bf_df = session_w_polars.read_pandas(scalars_pandas_df_index)
33+
34+
pd_result = scalars_pandas_df_index.sort_index(ascending=False)[
35+
["int64_too", "bool_col"]
36+
]
37+
bf_result = bf_df.sort_index(ascending=False)[["int64_too", "bool_col"]].to_pandas()
38+
39+
assert session_w_polars._metrics.execution_count == execution_count_before
40+
assert_pandas_df_equal(bf_result, pd_result)

tests/unit/polars_session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def execute(
4646
"""
4747
Execute the ArrayValue, storing the result to a temporary session-owned table.
4848
"""
49-
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value)
49+
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node)
5050
pa_table = lazy_frame.collect().to_arrow()
5151
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
5252
# Nullability may be different, and might use large versions of list, string datatypes.

0 commit comments

Comments
 (0)