Skip to content

Commit

Permalink
chore: refactor flow
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-milan committed May 22, 2024
1 parent 2b3e3dc commit 72a5e3c
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 31 deletions.
74 changes: 43 additions & 31 deletions pipelines/painel_obras/dump_data/flows.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,58 @@
# -*- coding: utf-8 -*-
from copy import deepcopy

from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_templates.dump_to_gcs.flows import (
flow as dump_to_gcs_flow,
)
from prefeitura_rio.pipelines_utils.prefect import set_default_parameters
from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)
from prefeitura_rio.pipelines_utils.tasks import (
get_project_id,
rename_current_flow_run_dataset_table,
)

from pipelines.constants import constants
from pipelines.painel_obras.dump_data.schedules import painel_obras__dump_data_schedule
from pipelines.painel_obras.dump_data.tasks import download_data_to_gcs

rj_iplanrio__painel_obras__dump_data_flow = deepcopy(dump_to_gcs_flow)
rj_iplanrio__painel_obras__dump_data_flow.state_handlers = [
handler_inject_bd_credentials,
handler_initialize_sentry,
]
rj_iplanrio__painel_obras__dump_data_flow.name = "IPLANRIO: Painel de obras - Dump to GCS"
rj_iplanrio__painel_obras__dump_data_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)

rj_iplanrio__painel_obras__dump_data_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_IPLANRIO_AGENT_LABEL.value, # label do agente
with Flow(
name="IPLANRIO: Painel de obras - Dump to GCS",
state_handlers=[
handler_initialize_sentry,
handler_inject_bd_credentials,
],
)
parallelism=10,
skip_if_running=False,
) as rj_iplanrio__painel_obras__dump_data__flow:
project_id = Parameter("project_id", required=False)
dataset_id = Parameter("dataset_id")
table_id = Parameter("table_id")
query = Parameter("query")
billing_project_id = Parameter("billing_project_id", required=False)
bd_project_mode = Parameter("bd_project_mode", required=False, default="prod")

painel_obras__dump_data_default_parameters = {
"project_id": "rj-iplanrio",
"dataset_id": "painel_obras",
# "table_id": "", # set this in schedule
# "query": "", # set this in schedule
"billing_project_id": "rj-iplanrio",
}
rename_flow_run = rename_current_flow_run_dataset_table(
prefix="IPLANRIO: Painel de obras - Dump to GCS: ",
dataset_id=dataset_id,
table_id=table_id,
)

rj_iplanrio__painel_obras__dump_data_flow = set_default_parameters(
rj_iplanrio__painel_obras__dump_data_flow,
default_parameters=painel_obras__dump_data_default_parameters,
)
final_project_id = get_project_id(project_id=project_id, bd_project_mode=bd_project_mode)
final_project_id.set_upstream(rename_flow_run)

rj_iplanrio__painel_obras__dump_data_flow.schedule = painel_obras__dump_data_schedule
download_task = download_data_to_gcs(
project_id=final_project_id,
dataset_id=dataset_id,
table_id=table_id,
query=query,
bd_project_mode=bd_project_mode,
billing_project_id=billing_project_id,
)

rj_iplanrio__painel_obras__dump_data__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
rj_iplanrio__painel_obras__dump_data__flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_IPLANRIO_AGENT_LABEL.value],
)
rj_iplanrio__painel_obras__dump_data__flow.schedule = painel_obras__dump_data_schedule
81 changes: 81 additions & 0 deletions pipelines/painel_obras/dump_data/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
from time import sleep

from basedosdados.download.base import google_client
from basedosdados.upload.base import Base
from google.cloud import bigquery
from prefect import task
from prefeitura_rio.pipelines_utils.gcs import list_blobs_with_prefix
from prefeitura_rio.pipelines_utils.logging import log


@task
def download_data_to_gcs( # pylint: disable=R0912,R0913,R0914,R0915
project_id: str,
dataset_id: str,
table_id: str,
query: str,
bd_project_mode: str = "prod",
billing_project_id: str = None,
location: str = "US",
):
"""
Get data from BigQuery.
"""
# Asserts that dataset_id and table_id are provided
if not dataset_id or not table_id:
raise ValueError("dataset_id and table_id must be provided")

# If query is not a string, raise an error
if not isinstance(query, str):
raise ValueError("query must be a string")
log(f"Query was provided: {query}")

# Get billing project ID
if not billing_project_id:
log("Billing project ID was not provided, trying to get it from environment variable")
try:
bd_base = Base()
billing_project_id = bd_base.config["gcloud-projects"][bd_project_mode]["name"]
except KeyError:
pass
if not billing_project_id:
raise ValueError(
"billing_project_id must be either provided or inferred from environment variables"
)
log(f"Billing project ID was inferred from environment variables: {billing_project_id}")

# Get data
log("Querying data from BigQuery")
client = google_client(project_id, billing_project_id, from_file=True, reauth=False)
job = client["bigquery"].query(query)
while not job.done():
sleep(1)
dest_table = job._properties["configuration"]["query"]["destinationTable"]
dest_project_id = dest_table["projectId"]
dest_dataset_id = dest_table["datasetId"]
dest_table_id = dest_table["tableId"]
log(f"Query results were stored in {dest_project_id}.{dest_dataset_id}.{dest_table_id}")

blob_path = f"gs://datario/share/{dataset_id}/{table_id}/data.csv.gz"
log(f"Loading data to {blob_path}")
dataset_ref = bigquery.DatasetReference(dest_project_id, dest_dataset_id)
table_ref = dataset_ref.table(dest_table_id)
job_config = bigquery.job.ExtractJobConfig(compression="GZIP")
extract_job = client["bigquery"].extract_table(
table_ref,
blob_path,
location=location,
job_config=job_config,
)
extract_job.result()
log("Data was loaded successfully")

# Get the BLOB we've just created and make it public
blobs = list_blobs_with_prefix("datario", f"share/{dataset_id}/{table_id}/")
if not blobs:
raise ValueError(f"No blob found at {blob_path}")
for blob in blobs:
log(f"Blob found at {blob.name}")
blob.make_public()
log("Blob was made public")

0 comments on commit 72a5e3c

Please sign in to comment.