Skip to content

Commit

Permalink
feat: add flood_detection pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
d116626 committed Dec 19, 2023
1 parent 6bd5097 commit d517b7c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
cameras_geodf_url = Parameter(
"cameras_geodf_url",
required=True,
default="https://docs.google.com/spreadsheets/d/122uOaPr8YdW5PTzrxSPF-FD0tgco596HqgB7WK7cHFw/edit#gid=1580662721",
)
mocked_cameras_number = Parameter(
"mocked_cameras_number",
Expand All @@ -49,7 +50,6 @@
default="https://api.openai.com/v1/chat/completions",
)
api_key_secret_path = Parameter("api_key_secret_path", required=True)
openai_flooding_detection_prompt = Parameter("openai_flooding_detection_prompt", required=True)
rain_api_data_url = Parameter(
"rain_api_url",
default="https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/",
Expand Down Expand Up @@ -78,14 +78,12 @@
predictions_buffer_key=redis_key_predictions_buffer,
number_mock_rain_cameras=mocked_cameras_number,
)
api_key = get_api_key(secret_path=api_key_secret_path, model_name="gpt")
api_key = get_api_key(secret_path=api_key_secret_path, secret_name="GEMINI-PRO-VISION-API-KEY")
cameras_with_image = get_snapshot.map(
camera=cameras,
)
cameras_with_image_and_classification = get_prediction.map(
camera_with_image=cameras_with_image,
flooding_prompt=unmapped(openai_flooding_detection_prompt),
openai_api_key=unmapped(api_key),
openai_api_model=unmapped(openai_api_model),
openai_api_max_tokens=unmapped(openai_api_max_tokens),
openai_api_url=unmapped(openai_api_url),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,12 @@
constants.RJ_ESCRITORIO_AGENT_LABEL.value,
],
parameter_defaults={
"cameras_geodf_url": "https://prefeitura-rio.github.io/storage/cameras_geo_min_bolsao_sample.csv", # noqa
"cameras_geodf_url": "https://docs.google.com/spreadsheets/d/122uOaPr8YdW5PTzrxSPF-FD0tgco596HqgB7WK7cHFw/edit#gid=1580662721", # noqa
"mocked_cameras_number": 0,
"api_key_secret_path": "/flooding-detection",
"openai_api_max_tokens": 300,
"openai_api_model": "gpt-4-vision-preview",
"openai_api_url": "https://api.openai.com/v1/chat/completions",
"openai_flooding_detection_prompt": """You are an expert flooding detector. You are
given a image. You must detect if there is flooding in the image. The output MUST
be a JSON object with a boolean value for the key "flooding_detected". If you don't
know what to anwser, you can set the key "flooding_detect" as false. Example:
{
"flooding_detected": true
}
""",
"rain_api_update_url": "https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/", # noqa
"rain_api_url": "https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/",
"redis_key_flooding_detection_data": "flooding_detection_data",
Expand Down
20 changes: 10 additions & 10 deletions pipelines/deteccao_alagamento_cameras/flooding_detection/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,18 @@ def get_last_update(


@task
def get_api_key(secret_path: str, model_name: str) -> str:
def get_api_key(secret_path: str, secret_name: str = "GEMINI-PRO-VISION-API-KEY") -> str:
"""
Gets the OpenAI API key.
Gets the GEMINI API KEY.
Args:
secret_path: The secret path.
secret_name: The secret name.
Returns:
The OpenAI API key.
The API key.
"""

if "gpt" in model_name:
secret_name = "GPT-4-VISION-API-KEY"
elif "gemini" in model_name:
secret_name = "GEMINI-PRO-VISION-API-KEY"
else:
return None

secret = get_secret(secret_name=secret_name, path=secret_path)
return secret[secret_name]

Expand Down Expand Up @@ -266,7 +260,12 @@ def pick_cameras(
cameras_data_path = Path("/tmp") / "cameras_geo_min.csv"
if not download_file(url=cameras_data_url, output_path=cameras_data_path):
raise RuntimeError("Failed to download the cameras data.")

cameras = pd.read_csv(cameras_data_path)

# get only selected cameras from google sheets
cameras = cameras[cameras["identificador"].notna()]

cameras = cameras.drop(columns=["geometry"])
geometry = [Point(xy) for xy in zip(cameras["longitude"], cameras["latitude"])]
df_cameras = gpd.GeoDataFrame(cameras, geometry=geometry)
Expand Down Expand Up @@ -317,6 +316,7 @@ def pick_cameras(
"latitude": row["geometry"].y,
"longitude": row["geometry"].x,
"attempt_classification": (row["status"] not in ["sem chuva", "chuva fraca"]),
"identificador": row["identificador"],
}
)
log(f"Picked cameras: {output}")
Expand Down
29 changes: 15 additions & 14 deletions pipelines/deteccao_alagamento_cameras/flooding_detection/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""
Data in: https://drive.google.com/drive/folders/1C-W_MMFAAJy5Lq_rHDzXUesEUyzke5gw
"""
from io import StringIO
from pathlib import Path
from typing import Any, Dict, List, Union

Expand All @@ -13,10 +14,12 @@
import requests
from shapely.geometry import Point, Polygon

from pipelines.utils.utils import get_redis_client, log, remove_columns_accents
from prefeitura_rio.pipelines_utils.redis_pal import get_redis_client
from prefeitura_rio.pipelines_utils.logging import log
from prefeitura_rio.pipelines_utils.pandas import remove_columns_accents


def download_file(url: str, output_path: Union[str, Path]) -> bool:
def download_file(url: str, output_path: Union[str, Path]) -> None:
"""
Downloads a file from a URL.
Expand All @@ -27,10 +30,11 @@ def download_file(url: str, output_path: Union[str, Path]) -> bool:
Returns:
Whether the file was downloaded successfully.
"""
response = requests.get(url)
request_url = url.replace("edit#gid=", "export?format=csv&gid=")
response = requests.get(request_url)
if response.status_code == 200:
with open(output_path, "wb") as f:
f.write(response.content)
dataframe = pd.read_csv(StringIO(response.content.decode("utf-8")))
dataframe.to_csv(output_path, index=False)
return True
return False

Expand Down Expand Up @@ -127,7 +131,9 @@ def get_rain_dataframe() -> pd.DataFrame:
data = requests.get(api_url).json()
df_rain = pd.DataFrame(data)

last_update_url = "https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/" # noqa
last_update_url = (
"https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/" # noqa
)
last_update = requests.get(last_update_url).json()
df_rain["last_update"] = last_update
df_rain["last_update"] = pd.to_datetime(df_rain["last_update"])
Expand Down Expand Up @@ -190,8 +196,7 @@ def get_cameras_h3_bolsao(cameras_h3: gpd.GeoDataFrame, buffer: int = 0.002):
cameras_bolsao_h3 = gpd.sjoin(cameras_h3, bolsao_geo, how="left", op="within")

cameras_bolsao_h3["geometry_bolsao_buffer_0.002"] = [
Point(xy).buffer(buffer)
for xy in zip(cameras_bolsao_h3["long"], cameras_bolsao_h3["lat"])
Point(xy).buffer(buffer) for xy in zip(cameras_bolsao_h3["long"], cameras_bolsao_h3["lat"])
]
cameras_bolsao_h3["geometry_bolsao_buffer_0.002"] = cameras_bolsao_h3[
f"geometry_bolsao_buffer_{buffer}"
Expand All @@ -218,9 +223,7 @@ def clean_and_padronize_cameras() -> gpd.GeoDataFrame:
- gpd.GeoDataFrame: A GeoDataFrame containing the cleaned, standardized, and geographically
enriched camera data.
"""
df = pd.read_csv(
"./data/Cameras_em_2023-11-13.csv", delimiter=";", encoding="latin1"
)
df = pd.read_csv("./data/Cameras_em_2023-11-13.csv", delimiter=";", encoding="latin1")
df.columns = remove_columns_accents(df)
df["codigo"] = df["codigo"].str.replace("'", "")
df = df[df["status"] == "Online"]
Expand Down Expand Up @@ -257,9 +260,7 @@ def clean_and_padronize_cameras() -> gpd.GeoDataFrame:
"id_h3",
]
cameras_h3 = cameras_h3[cols]
cameras_h3 = cameras_h3.rename(
columns={"codigo": "id_camera", "nome_da_camera": "nome"}
)
cameras_h3 = cameras_h3.rename(columns={"codigo": "id_camera", "nome_da_camera": "nome"})

cameras_h3 = cameras_h3.reset_index(drop=True)
log("cameras_h3: ", cameras_h3.shape)
Expand Down

0 comments on commit d517b7c

Please sign in to comment.