Skip to content

Commit 18c99df

Browse files
Add ORC support to GCSToBigQueryOperator and test for external tables (#49188)
* Add ORC support to GCSToBigQueryOperator and test for external tables * Add ORC autodetect config to src_fmt_to_configs_mapping
1 parent ae89d60 commit 18c99df

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
"GOOGLE_SHEETS",
5959
"DATASTORE_BACKUP",
6060
"PARQUET",
61+
"ORC",
6162
]
6263

6364

@@ -676,6 +677,7 @@ def _use_existing_table(self):
676677
"NEWLINE_DELIMITED_JSON": ["autodetect", "ignoreUnknownValues"],
677678
"PARQUET": ["autodetect", "ignoreUnknownValues"],
678679
"AVRO": ["useAvroLogicalTypes"],
680+
"ORC": ["autodetect"],
679681
}
680682

681683
valid_configs = src_fmt_to_configs_mapping[self.source_format]

providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1565,6 +1565,50 @@ def test_get_openlineage_facets_on_complete_full_table_multiple_gcs_uris(self, h
15651565
}
15661566
assert lineage.job_facets == {}
15671567

1568+
@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
1569+
def test_external_table_should_accept_orc_source_format(self, hook):
1570+
hook.return_value.insert_job.side_effect = [
1571+
MagicMock(job_id=REAL_JOB_ID, error_result=False),
1572+
REAL_JOB_ID,
1573+
]
1574+
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
1575+
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)
1576+
1577+
operator = GCSToBigQueryOperator(
1578+
task_id=TASK_ID,
1579+
bucket=TEST_BUCKET,
1580+
source_objects=TEST_SOURCE_OBJECTS,
1581+
destination_project_dataset_table=TEST_EXPLICIT_DEST,
1582+
schema_fields=SCHEMA_FIELDS,
1583+
write_disposition=WRITE_DISPOSITION,
1584+
external_table=True,
1585+
project_id=JOB_PROJECT_ID,
1586+
source_format="ORC",
1587+
)
1588+
1589+
operator.execute(context=MagicMock())
1590+
1591+
hook.return_value.create_empty_table.assert_called_once_with(
1592+
exists_ok=True,
1593+
location=None,
1594+
project_id=JOB_PROJECT_ID,
1595+
table_resource={
1596+
"tableReference": {
1597+
"projectId": PROJECT_ID,
1598+
"datasetId": DATASET,
1599+
"tableId": TABLE,
1600+
},
1601+
"externalDataConfiguration": {
1602+
"autodetect": True,
1603+
"sourceFormat": "ORC",
1604+
"sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
1605+
"compression": "NONE",
1606+
"ignoreUnknownValues": False,
1607+
"schema": {"fields": SCHEMA_FIELDS},
1608+
},
1609+
},
1610+
)
1611+
15681612

15691613
@pytest.fixture
15701614
def create_task_instance(create_task_instance_of_operator, session):

0 commit comments

Comments
 (0)