From 72a5e3c967554b8065beec52ef135fc6e8a4eadf Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Wed, 22 May 2024 08:27:37 -0300 Subject: [PATCH] chore: refactor flow --- pipelines/painel_obras/dump_data/flows.py | 74 ++++++++++++--------- pipelines/painel_obras/dump_data/tasks.py | 81 +++++++++++++++++++++++ 2 files changed, 124 insertions(+), 31 deletions(-) create mode 100644 pipelines/painel_obras/dump_data/tasks.py diff --git a/pipelines/painel_obras/dump_data/flows.py b/pipelines/painel_obras/dump_data/flows.py index 86c8897..e39f842 100644 --- a/pipelines/painel_obras/dump_data/flows.py +++ b/pipelines/painel_obras/dump_data/flows.py @@ -1,46 +1,58 @@ # -*- coding: utf-8 -*- -from copy import deepcopy - +from prefect import Parameter from prefect.run_configs import KubernetesRun from prefect.storage import GCS -from prefeitura_rio.pipelines_templates.dump_to_gcs.flows import ( - flow as dump_to_gcs_flow, -) -from prefeitura_rio.pipelines_utils.prefect import set_default_parameters +from prefeitura_rio.pipelines_utils.custom import Flow from prefeitura_rio.pipelines_utils.state_handlers import ( handler_initialize_sentry, handler_inject_bd_credentials, ) +from prefeitura_rio.pipelines_utils.tasks import ( + get_project_id, + rename_current_flow_run_dataset_table, +) from pipelines.constants import constants from pipelines.painel_obras.dump_data.schedules import painel_obras__dump_data_schedule +from pipelines.painel_obras.dump_data.tasks import download_data_to_gcs -rj_iplanrio__painel_obras__dump_data_flow = deepcopy(dump_to_gcs_flow) -rj_iplanrio__painel_obras__dump_data_flow.state_handlers = [ - handler_inject_bd_credentials, - handler_initialize_sentry, -] -rj_iplanrio__painel_obras__dump_data_flow.name = "IPLANRIO: Painel de obras - Dump to GCS" -rj_iplanrio__painel_obras__dump_data_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) - -rj_iplanrio__painel_obras__dump_data_flow.run_config = KubernetesRun( - image=constants.DOCKER_IMAGE.value, - labels=[ - constants.RJ_IPLANRIO_AGENT_LABEL.value, # label do agente +with Flow( + name="IPLANRIO: Painel de obras - Dump to GCS", + state_handlers=[ + handler_initialize_sentry, + handler_inject_bd_credentials, ], -) + parallelism=10, + skip_if_running=False, +) as rj_iplanrio__painel_obras__dump_data__flow: + project_id = Parameter("project_id", required=False) + dataset_id = Parameter("dataset_id") + table_id = Parameter("table_id") + query = Parameter("query") + billing_project_id = Parameter("billing_project_id", required=False) + bd_project_mode = Parameter("bd_project_mode", required=False, default="prod") -painel_obras__dump_data_default_parameters = { - "project_id": "rj-iplanrio", - "dataset_id": "painel_obras", - # "table_id": "", # set this in schedule - # "query": "", # set this in schedule - "billing_project_id": "rj-iplanrio", -} + rename_flow_run = rename_current_flow_run_dataset_table( + prefix="IPLANRIO: Painel de obras - Dump to GCS: ", + dataset_id=dataset_id, + table_id=table_id, + ) -rj_iplanrio__painel_obras__dump_data_flow = set_default_parameters( - rj_iplanrio__painel_obras__dump_data_flow, - default_parameters=painel_obras__dump_data_default_parameters, -) + final_project_id = get_project_id(project_id=project_id, bd_project_mode=bd_project_mode) + final_project_id.set_upstream(rename_flow_run) -rj_iplanrio__painel_obras__dump_data_flow.schedule = painel_obras__dump_data_schedule + download_task = download_data_to_gcs( + project_id=final_project_id, + dataset_id=dataset_id, + table_id=table_id, + query=query, + bd_project_mode=bd_project_mode, + billing_project_id=billing_project_id, + ) + +rj_iplanrio__painel_obras__dump_data__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +rj_iplanrio__painel_obras__dump_data__flow.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_IPLANRIO_AGENT_LABEL.value], +) +rj_iplanrio__painel_obras__dump_data__flow.schedule = painel_obras__dump_data_schedule diff --git a/pipelines/painel_obras/dump_data/tasks.py b/pipelines/painel_obras/dump_data/tasks.py new file mode 100644 index 0000000..13f1971 --- /dev/null +++ b/pipelines/painel_obras/dump_data/tasks.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +from time import sleep + +from basedosdados.download.base import google_client +from basedosdados.upload.base import Base +from google.cloud import bigquery +from prefect import task +from prefeitura_rio.pipelines_utils.gcs import list_blobs_with_prefix +from prefeitura_rio.pipelines_utils.logging import log + + +@task +def download_data_to_gcs( # pylint: disable=R0912,R0913,R0914,R0915 + project_id: str, + dataset_id: str, + table_id: str, + query: str, + bd_project_mode: str = "prod", + billing_project_id: str = None, + location: str = "US", +): + """ + Get data from BigQuery. + """ + # Asserts that dataset_id and table_id are provided + if not dataset_id or not table_id: + raise ValueError("dataset_id and table_id must be provided") + + # If query is not a string, raise an error + if not isinstance(query, str): + raise ValueError("query must be a string") + log(f"Query was provided: {query}") + + # Get billing project ID + if not billing_project_id: + log("Billing project ID was not provided, trying to get it from environment variable") + try: + bd_base = Base() + billing_project_id = bd_base.config["gcloud-projects"][bd_project_mode]["name"] + except KeyError: + pass + if not billing_project_id: + raise ValueError( + "billing_project_id must be either provided or inferred from environment variables" + ) + log(f"Billing project ID was inferred from environment variables: {billing_project_id}") + + # Get data + log("Querying data from BigQuery") + client = google_client(project_id, billing_project_id, from_file=True, reauth=False) + job = client["bigquery"].query(query) + while not job.done(): + sleep(1) + dest_table = job._properties["configuration"]["query"]["destinationTable"] + dest_project_id = dest_table["projectId"] + dest_dataset_id = dest_table["datasetId"] + dest_table_id = dest_table["tableId"] + log(f"Query results were stored in {dest_project_id}.{dest_dataset_id}.{dest_table_id}") + + blob_path = f"gs://datario/share/{dataset_id}/{table_id}/data.csv.gz" + log(f"Loading data to {blob_path}") + dataset_ref = bigquery.DatasetReference(dest_project_id, dest_dataset_id) + table_ref = dataset_ref.table(dest_table_id) + job_config = bigquery.job.ExtractJobConfig(compression="GZIP") + extract_job = client["bigquery"].extract_table( + table_ref, + blob_path, + location=location, + job_config=job_config, + ) + extract_job.result() + log("Data was loaded successfully") + + # Get the BLOB we've just created and make it public + blobs = list_blobs_with_prefix("datario", f"share/{dataset_id}/{table_id}/") + if not blobs: + raise ValueError(f"No blob found at {blob_path}") + for blob in blobs: + log(f"Blob found at {blob.name}") + blob.make_public() + log("Blob was made public")