diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 259cbd8e327b8..94ae6a478553b 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -58,6 +58,7 @@ "GOOGLE_SHEETS", "DATASTORE_BACKUP", "PARQUET", + "ORC", ] diff --git a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py index 299fc9fead54d..c9a033b16d6d2 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py @@ -1565,6 +1565,50 @@ def test_get_openlineage_facets_on_complete_full_table_multiple_gcs_uris(self, h } assert lineage.job_facets == {} + @mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook")) + def test_external_table_should_accept_orc_source_format(self, hook): + hook.return_value.insert_job.side_effect = [ + MagicMock(job_id=REAL_JOB_ID, error_result=False), + REAL_JOB_ID, + ] + hook.return_value.generate_job_id.return_value = REAL_JOB_ID + hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) + + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS, + destination_project_dataset_table=TEST_EXPLICIT_DEST, + schema_fields=SCHEMA_FIELDS, + write_disposition=WRITE_DISPOSITION, + external_table=True, + project_id=JOB_PROJECT_ID, + source_format="ORC", + ) + + operator.execute(context=MagicMock()) + + hook.return_value.create_empty_table.assert_called_once_with( + exists_ok=True, + location=None, + project_id=JOB_PROJECT_ID, + table_resource={ + "tableReference": { + "projectId": PROJECT_ID, + "datasetId": DATASET, + "tableId": TABLE, + }, + "externalDataConfiguration": { + "autodetect": True, + "sourceFormat": "ORC", + "sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"], + "compression": "NONE", + "ignoreUnknownValues": False, + "schema": {"fields": SCHEMA_FIELDS}, + }, + }, + ) + @pytest.fixture def create_task_instance(create_task_instance_of_operator, session):