Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ coverage.xml
# Version file is auto-generated by setuptools_scm
flytekit/_version.py
testing
.flyte/
11 changes: 8 additions & 3 deletions flytekit/types/structured/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@


def _write_to_bq(structured_dataset: StructuredDataset):
table_id = typing.cast(str, structured_dataset.uri).split("://", 1)[1].replace(":", ".")
client = bigquery.Client()
# Structured dataset uri: bq://project:dataset.table
if structured_dataset.uri is None:
raise RuntimeError("StructuredDataset URI for BigQuery is missing. Expected format: bq://project:dataset.table")
project = structured_dataset.uri.removeprefix("bq://").split(":", 1)[0]
dst = typing.cast(str, structured_dataset.uri).split("://", 1)[1].replace(":", ".")
client = bigquery.Client(project=project)
df = structured_dataset.dataframe
if isinstance(df, pa.Table):
df = df.to_pandas()
client.load_table_from_dataframe(df, table_id)
client.load_table_from_dataframe(df, dst)


def _read_from_bq(
Expand Down Expand Up @@ -121,3 +125,4 @@ def decode(
current_task_metadata: StructuredDatasetMetadata,
) -> pa.Table:
return pa.Table.from_pandas(_read_from_bq(flyte_value, current_task_metadata))

Loading