Skip to content

test: Add ReadLocalNode tests #1794

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 9, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions bigframes/core/local_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,13 @@ def _adapt_pandas_series(
) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]:
# Mostly rely on pyarrow conversions, but have to convert geo without its help.
if series.dtype == bigframes.dtypes.GEO_DTYPE:
series = geopandas.GeoSeries(series).to_wkt(rounding_precision=-1)
# geoseries produces eg "POINT (1, 1)", while bq uses style "POINT(1, 1)"
# we normalize to bq style for consistency
series = (
geopandas.GeoSeries(series)
.to_wkt(rounding_precision=-1)
.str.replace(r"(\w+) \(", repl=r"\1(", regex=True)
)
return pa.array(series, type=pa.string()), bigframes.dtypes.GEO_DTYPE
try:
return _adapt_arrow_array(pa.array(series))
Expand Down Expand Up @@ -326,7 +332,7 @@ def _adapt_arrow_array(array: pa.Array) -> tuple[pa.Array, bigframes.dtypes.Dtyp
return new_value.fill_null([]), bigframes.dtypes.list_type(values_type)
if array.type == bigframes.dtypes.JSON_ARROW_TYPE:
return _canonicalize_json(array), bigframes.dtypes.JSON_DTYPE
target_type = _logical_type_replacements(array.type)
target_type = logical_type_replacements(array.type)
if target_type != array.type:
# TODO: Maybe warn if lossy conversion?
array = array.cast(target_type)
Expand Down Expand Up @@ -372,6 +378,10 @@ def recursive_f(type: pa.DataType) -> pa.DataType:
if new_field_t != type.value_type:
return pa.list_(new_field_t)
return type
# polars can produce large lists, and we want to map these down to regular lists
if pa.types.is_large_list(type):
new_field_t = recursive_f(type.value_type)
return pa.list_(new_field_t)
if pa.types.is_struct(type):
struct_type = cast(pa.StructType, type)
new_fields: list[pa.Field] = []
Expand All @@ -385,7 +395,7 @@ def recursive_f(type: pa.DataType) -> pa.DataType:


@_recursive_map_types
def _logical_type_replacements(type: pa.DataType) -> pa.DataType:
def logical_type_replacements(type: pa.DataType) -> pa.DataType:
if pa.types.is_timestamp(type):
# This is potentially lossy, but BigFrames doesn't support ns
new_tz = "UTC" if (type.tz is not None) else None
Expand All @@ -403,8 +413,11 @@ def _logical_type_replacements(type: pa.DataType) -> pa.DataType:
if pa.types.is_large_string(type):
# simple string type can handle the largest strings needed
return pa.string()
if pa.types.is_large_binary(type):
# simple string type can handle the largest strings needed
return pa.binary()
if pa.types.is_dictionary(type):
return _logical_type_replacements(type.value_type)
return logical_type_replacements(type.value_type)
if pa.types.is_null(type):
# null as a type not allowed, default type is float64 for bigframes
return pa.float64()
Expand Down
4 changes: 4 additions & 0 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,10 @@ class ScanList:

items: typing.Tuple[ScanItem, ...]

@classmethod
def from_items(cls, items: Iterable[ScanItem]) -> ScanList:
return cls(tuple(items))

def filter_cols(
self,
ids: AbstractSet[identifiers.ColumnId],
Expand Down
6 changes: 6 additions & 0 deletions bigframes/core/pyarrow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,9 @@ def append_offsets(
return pa_table.append_column(
offsets_col, pa.array(range(pa_table.num_rows), type=pa.int64())
)


def as_nullable(pa_table: pa.Table):
"""Normalizes schema to nullable for value-wise comparisons."""
nullable_schema = pa.schema(field.with_nullable(True) for field in pa_table.schema)
return pa_table.cast(nullable_schema)
75 changes: 75 additions & 0 deletions bigframes/session/direct_gbq_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

from typing import Optional, Tuple

from google.cloud import bigquery
import google.cloud.bigquery.job as bq_job
import google.cloud.bigquery.table as bq_table

from bigframes.core import compile, nodes
from bigframes.session import executor, semi_executor
import bigframes.session._io.bigquery as bq_io


# used only in testing right now, BigQueryCachingExecutor is the fully featured engine
# simplified, doest not do large >10 gb result queries, error handling, respect global config
# or record metrics
class DirectGbqExecutor(semi_executor.SemiExecutor):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the purpose is to make sure caching and such don't cause any regressions / differences in behavior? Might be good to include that in the comment / docstring if so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, mostly just to isolate the simplest, fastest version of bq execution, and avoid slow/complicated stuff only needed at >10gb scale or for stateful interactive flows. added comment in new revision

def __init__(self, bqclient: bigquery.Client):
self.bqclient = bqclient

def execute(
self,
plan: nodes.BigFrameNode,
ordered: bool,
peek: Optional[int] = None,
) -> executor.ExecuteResult:
"""Just execute whatever plan as is, without further caching or decomposition."""
# TODO(swast): plumb through the api_name of the user-facing api that
# caused this query.

compiled = compile.compile_sql(
compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek)
)
iterator, query_job = self._run_execute_query(
sql=compiled.sql,
)

return executor.ExecuteResult(
arrow_batches=iterator.to_arrow_iterable(),
schema=plan.schema,
query_job=query_job,
total_rows=iterator.total_rows,
)

def _run_execute_query(
self,
sql: str,
job_config: Optional[bq_job.QueryJobConfig] = None,
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
"""
Starts BigQuery query job and waits for results.
"""
return bq_io.start_query_with_client(
self.bqclient,
sql,
job_config=job_config or bq_job.QueryJobConfig(),
project=None,
location=None,
timeout=None,
metrics=None,
query_with_job=False,
)
80 changes: 80 additions & 0 deletions bigframes/session/polars_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

from typing import Optional, TYPE_CHECKING

import pyarrow as pa

from bigframes.core import array_value, bigframe_node, local_data, nodes
from bigframes.session import executor, semi_executor

if TYPE_CHECKING:
import polars as pl


_COMPATIBLE_NODES = (
nodes.ReadLocalNode,
nodes.OrderByNode,
nodes.ReversedNode,
nodes.SelectionNode,
nodes.FilterNode, # partial support
nodes.ProjectionNode, # partial support
)


class PolarsExecutor(semi_executor.SemiExecutor):
def __init__(self):
# This will error out if polars is not installed
from bigframes.core.compile.polars import PolarsCompiler
Comment on lines +39 to +40
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to add a little helper to make sure folks know what package (and possibly versions) to install. See pandas helpers like this:

https://github.com/pandas-dev/pandas/blob/085e18fff3ba9e6b16f4d5fbdea1156c4c6aa195/pandas/compat/_optional.py#L151-L192

Or closer to home, the versions helpers in some of our client libraries:

https://github.com/googleapis/python-bigquery/blob/bd5aba8ba40c2f35fb672a68eed11d6baedb304f/google/cloud/bigquery/_versions_helpers.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, thats a good idea. I am going to expose polars execution in another PR soon, so I'll figure out the messaging experience in that one.


self._compiler = PolarsCompiler()

def execute(
self,
plan: bigframe_node.BigFrameNode,
ordered: bool,
peek: Optional[int] = None,
) -> Optional[executor.ExecuteResult]:
if not self._can_execute(plan):
return None
# Note: Ignoring ordered flag, as just executing totally ordered is fine.
try:
lazy_frame: pl.LazyFrame = self._compiler.compile(
array_value.ArrayValue(plan)
)
except Exception:
return None
if peek is not None:
lazy_frame = lazy_frame.limit(peek)
pa_table = lazy_frame.collect().to_arrow()
return executor.ExecuteResult(
arrow_batches=iter(map(self._adapt_batch, pa_table.to_batches())),
schema=plan.schema,
total_bytes=pa_table.nbytes,
total_rows=pa_table.num_rows,
)

def _can_execute(self, plan: bigframe_node.BigFrameNode):
return all(isinstance(node, _COMPATIBLE_NODES) for node in plan.unique_nodes())

def _adapt_array(self, array: pa.Array) -> pa.Array:
target_type = local_data.logical_type_replacements(array.type)
if target_type != array.type:
return array.cast(target_type)
return array

def _adapt_batch(self, batch: pa.RecordBatch) -> pa.RecordBatch:
new_arrays = [self._adapt_array(arr) for arr in batch.columns]
return pa.RecordBatch.from_arrays(new_arrays, names=batch.column_names)
4 changes: 2 additions & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@
SYSTEM_TEST_EXTRAS_BY_PYTHON: Dict[str, List[str]] = {
"3.9": ["tests"],
"3.10": ["tests"],
"3.12": ["tests", "scikit-learn"],
"3.13": ["tests"],
"3.12": ["tests", "scikit-learn", "polars"],
"3.13": ["tests", "polars"],
}

LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
Expand Down
13 changes: 13 additions & 0 deletions tests/system/small/engines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
81 changes: 81 additions & 0 deletions tests/system/small/engines/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pathlib
from typing import Generator

from google.cloud import bigquery
import pandas as pd
import pytest

import bigframes
from bigframes.core import local_data
from bigframes.session import (
direct_gbq_execution,
local_scan_executor,
polars_executor,
semi_executor,
)

CURRENT_DIR = pathlib.Path(__file__).parent
DATA_DIR = CURRENT_DIR.parent.parent.parent / "data"


@pytest.fixture(scope="module")
def fake_session() -> Generator[bigframes.Session, None, None]:
import bigframes.core.global_session

# its a "polars session", but we are bypassing session-provided execution
# we just want a minimal placeholder session without expensive setup
from bigframes.testing import polars_session

session = polars_session.TestSession()
with bigframes.core.global_session._GlobalSessionContext(session):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat! Potentially something we want to consider exposing to users? I guess after someone asks for it so as not to pollute our public surface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if we get some requests, can productionize it. I'm worried its not robust to all the thread-local stuff however?

yield session


@pytest.fixture(scope="session", params=["pyarrow", "polars", "bq"])
def engine(request, bigquery_client: bigquery.Client) -> semi_executor.SemiExecutor:
if request.param == "pyarrow":
return local_scan_executor.LocalScanExecutor()
if request.param == "polars":
return polars_executor.PolarsExecutor()
if request.param == "bq":
return direct_gbq_execution.DirectGbqExecutor(bigquery_client)
raise ValueError(f"Unrecognized param: {request.param}")


@pytest.fixture(scope="module")
def managed_data_source(
scalars_pandas_df_index: pd.DataFrame,
) -> local_data.ManagedArrowTable:
return local_data.ManagedArrowTable.from_pandas(scalars_pandas_df_index)


@pytest.fixture(scope="module")
def zero_row_source() -> local_data.ManagedArrowTable:
return local_data.ManagedArrowTable.from_pandas(pd.DataFrame({"a": [], "b": []}))


@pytest.fixture(scope="module")
def nested_data_source(
nested_pandas_df: pd.DataFrame,
) -> local_data.ManagedArrowTable:
return local_data.ManagedArrowTable.from_pandas(nested_pandas_df)


@pytest.fixture(scope="module")
def repeated_data_source(
repeated_pandas_df: pd.DataFrame,
) -> local_data.ManagedArrowTable:
return local_data.ManagedArrowTable.from_pandas(repeated_pandas_df)
Loading