Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions broker/cloud_run/lsst/classify_vitune/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.11-slim

# Allow statements and log messages to immediately appear in the Knative logs
ENV PYTHONUNBUFFERED True

# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./

# Install production dependencies.
RUN pip install --no-cache-dir -r requirements.txt

# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
29 changes: 29 additions & 0 deletions broker/cloud_run/lsst/classify_vitune/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# https://cloud.google.com/build/docs/deploying-builds/deploy-cloud-run
# containerize the module and deploy it to Cloud Run
steps:
# Build the image
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '.']
# Push the image to Artifact Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}']
# Deploy image to Cloud Run
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: gcloud
args: ['run', 'deploy', '${_MODULE_NAME}', '--image', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '--region', '${_REGION}', '--set-env-vars', '${_ENV_VARS}']
images:
- '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}'
substitutions:
_SURVEY: 'lsst'
_TESTID: 'testid'
_MODULE_NAME: '${_SURVEY}-vitune-${_TESTID}'
_MODULE_IMAGE_NAME: 'gcr.io/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_NAME}'
_REPOSITORY: 'cloud-run-services'
# cloud functions automatically sets the projectid env var using the name "GCP_PROJECT"
# use the same name here for consistency
# [TODO] PROJECT_ID is set in setup.sh. this is confusing and we should revisit the decision.
# i (Raen) think i didn't make it a substitution because i didn't want to set a default for it.
_ENV_VARS: 'GCP_PROJECT=${PROJECT_ID},SURVEY=${_SURVEY},TESTID=${_TESTID}'
_REGION: 'us-central1'
options:
dynamic_substitutions: true
91 changes: 91 additions & 0 deletions broker/cloud_run/lsst/classify_vitune/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#! /bin/bash
# Deploys or deletes broker Cloud Run service
# This script will not delete Cloud Run services that are in production

# "False" uses production resources
# any other string will be appended to the names of all resources
testid="${1:-test}"
# "True" tearsdown/deletes resources, else setup
teardown="${2:-False}"
# name of the survey this broker instance will ingest
survey="${3:-lsst}"
region="${4:-us-central1}"
# get the environment variable
PROJECT_ID=$GOOGLE_CLOUD_PROJECT

MODULE_NAME="vitune" # lower case required by cloud run
ROUTE_RUN="/" # url route that will trigger main.run()

define_GCP_resources() {
local base_name="$1"
local separator="${2:--}"
local testid_suffix=""

if [ "$testid" != "False" ] && [ -n "$testid" ]; then
testid_suffix="${separator}${testid}"
fi
echo "${base_name}${testid_suffix}"
}

#--- GCP resources used in this script
artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services")
bq_dataset=$(define_GCP_resources "${survey}" "_")
bq_table="vitune"
cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run
ps_input_subscrip=$(define_GCP_resources "${survey}-vitune") # pub/sub subscription used to trigger cloud run module
ps_output_topic=$(define_GCP_resources "${survey}-vitune")
ps_trigger_topic=$(define_GCP_resources "${survey}-lite")
runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com"
# topics and subscriptions involved in writing data to BigQuery
ps_bigquery_subscription=$(define_GCP_resources "${survey}-${MODULE_NAME}-bigquery-import")
ps_deadletter_topic=$(define_GCP_resources "${survey}-deadletter")

if [ "${teardown}" = "True" ]; then
# ensure that we do not teardown production resources
if [ "${testid}" != "False" ]; then
echo
echo "Deleting resources for ${MODULE_NAME} module..."
gcloud pubsub topics delete "${ps_output_topic}"
gcloud pubsub subscriptions delete "${ps_bigquery_subscription}"
gcloud pubsub subscriptions delete "${ps_input_subscrip}"
gcloud run services delete "${cr_module_name}" --region "${region}"
fi

else
echo "Configuring Pub/Sub resources..."
gcloud pubsub topics create "${ps_output_topic}"
gcloud pubsub subscriptions create "${ps_bigquery_subscription}" \
--topic="${ps_output_topic}" \
--bigquery-table="${PROJECT_ID}:${bq_dataset}.${bq_table}" \
--use-table-schema \
--drop-unknown-fields \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5 \
--dead-letter-topic-project="${PROJECT_ID}"
# set IAM policies on public Pub/Sub resources
if [ "$testid" = "False" ]; then
user="allUsers"
roleid="projects/${GOOGLE_CLOUD_PROJECT}/roles/userPublic"
gcloud pubsub topics add-iam-policy-binding "${ps_output_topic}" --member="${user}" --role="${roleid}"
fi

#--- Deploy Cloud Run
echo
echo "Creating container image for ${MODULE_NAME} module and deploying to Cloud Run..."
moduledir="." # deploys what's in our current directory
config="${moduledir}/cloudbuild.yaml"
url=$(gcloud builds submit --config="${config}" \
--substitutions="_SURVEY=${survey},_TESTID=${testid},_MODULE_NAME=${cr_module_name},_REPOSITORY=${artifact_registry_repo}" \
--region="${region}" \
"${moduledir}" | sed -n 's/^Step #2: Service URL: \(.*\)$/\1/p')
echo
echo "Creating trigger subscription for ${MODULE_NAME} Cloud Run service..."
gcloud pubsub subscriptions create "${ps_input_subscrip}" \
--topic "${ps_trigger_topic}" \
--topic-project "${PROJECT_ID}" \
--ack-deadline=600 \
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}" \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5
fi
162 changes: 162 additions & 0 deletions broker/cloud_run/lsst/classify_vitune/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

"""Classify alerts using UPSILoN (Kim & Bailer-Jones 2015)."""

import os
import flask
import pandas as pd
import numpy as np
import pittgoogle
import upsilon
from google.cloud import logging

# [FIXME] Make this helpful or else delete it.
# Connect the python logger to the google cloud logger.
# By default, this captures INFO level and above.
# pittgoogle uses the python logger.
# We don't currently use the python logger directly in this script, but we could.
logging.Client().setup_logging()

PROJECT_ID = os.getenv("GCP_PROJECT")
TESTID = os.getenv("TESTID")
SURVEY = os.getenv("SURVEY")

# ---Variables for incoming data
# A url route is used in setup.sh when the trigger subscription is created.
# It is possible to define multiple routes in a single module and trigger them using different subscriptions.
ROUTE_RUN = "/" # HTTP route that will trigger run(). Must match deploy.sh
SURVEY_BANDS = ["u", "g", "r", "i", "z", "y"]

# ---Variables for outgoing data
HTTP_204 = 204 # HTTP code: Success
HTTP_400 = 400 # HTTP code: Bad Request

# ---GCP resources used in this module
TOPIC = pittgoogle.Topic.from_cloud("upsilon", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID)

app = flask.Flask(__name__)
rf_model = upsilon.load_rf_model() # load UPSILoN's classification model


@app.route(ROUTE_RUN, methods=["POST"])
def run() -> tuple[str, int]:
"""Classify alert with UPSILoN; publish and store results.

This module is intended to be deployed as a Cloud Run service. It will operate as an HTTP endpoint
triggered by Pub/Sub messages. This function will be called once for every message sent to this route.
It should accept the incoming HTTP request and return a response.

Returns
-------
response : tuple(str, int)
Tuple containing the response body (string) and HTTP status code (int). Flask will convert the
tuple into a proper HTTP response. Note that the response is a status message for the web server.
"""
# extract the envelope from the request that triggered the endpoint
# this contains a single Pub/Sub message with the alert to be processed
envelope = flask.request.get_json()
try:
alert_lite = pittgoogle.Alert.from_cloud_run(envelope, "default")
except pittgoogle.exceptions.BadRequest as exc:
return str(exc), HTTP_400

alert_lite_df = _create_lite_dataframe(alert_lite.dict["alert_lite"])
upsilon_dict = _classify_with_upsilon(alert_lite_df)
has_min_detections_in_any_band = any(
upsilon_dict.get(f"n_data_points_{band}_band") >= 80 for band in SURVEY_BANDS
)
TOPIC.publish(
pittgoogle.Alert.from_dict(
{"alert_lite": alert_lite.dict["alert_lite"], "upsilon": upsilon_dict},
attributes={
**alert_lite.attributes,
"pg_upsilon_u_label": upsilon_dict["u_label"],
"pg_upsilon_u_flag": upsilon_dict["u_flag"],
"pg_upsilon_g_label": upsilon_dict["g_label"],
"pg_upsilon_g_flag": upsilon_dict["g_flag"],
"pg_upsilon_r_label": upsilon_dict["r_label"],
"pg_upsilon_r_flag": upsilon_dict["r_flag"],
"pg_upsilon_i_label": upsilon_dict["i_label"],
"pg_upsilon_i_flag": upsilon_dict["i_flag"],
"pg_upsilon_z_label": upsilon_dict["z_label"],
"pg_upsilon_z_flag": upsilon_dict["z_flag"],
"pg_upsilon_y_label": upsilon_dict["y_label"],
"pg_upsilon_y_flag": upsilon_dict["y_flag"],
"pg_has_min_detections": int(has_min_detections_in_any_band),
},
schema_name="default",
)
)

return "", HTTP_204


def _classify_with_upsilon(alert_lite_df: pd.DataFrame) -> dict:
upsilon_dict = {}
for band in SURVEY_BANDS:
# ---Extract data
filter_diaSources = alert_lite_df[alert_lite_df["band"] == band]
flux_gt_zero = filter_diaSources["psfFlux"].to_numpy() > 0
upsilon_dict[f"n_data_points_{band}_band"] = flux_gt_zero.sum().item()
# skip band if no detections or too few valid data points.
# to avoid scipy's leastsq error: ("input vector length N=7 must not exceed output length M"), we require
# that flux_gt_zero.sum() > 7
if filter_diaSources.empty or flux_gt_zero.sum() <= 7:
upsilon_dict[f"{band}_label"] = None
upsilon_dict[f"{band}_probability"] = None
upsilon_dict[f"{band}_flag"] = None
continue
# ---Extract features
flux = filter_diaSources["psfFlux"].to_numpy()[flux_gt_zero]
flux_err = filter_diaSources["psfFluxErr"].to_numpy()[flux_gt_zero]
date = filter_diaSources["midpointMjdTai"].to_numpy()[flux_gt_zero]
mag = _convert_flux_to_mag(flux)
mag_err = _calculate_mag_err(flux, flux_err)
e_features = upsilon.ExtractFeatures(date, mag, mag_err)
e_features.run()
features = e_features.get_features()
# ---Classify
label, probability, flag = upsilon.predict(rf_model, features)
upsilon_dict[f"{band}_label"] = label
upsilon_dict[f"{band}_probability"] = probability
upsilon_dict[f"{band}_flag"] = flag

return upsilon_dict


def _create_lite_dataframe(alert_dict: dict) -> pd.DataFrame:
"""Return a pandas DataFrame containing the source detections."""

# sources and previous sources are expected to have the same fields
sources_df = pd.DataFrame(
[alert_dict.get("diaSource")] + (alert_dict.get("prvDiaSources") or [])
)
# sources and forced sources may have different fields
forced_df = pd.DataFrame(alert_dict.get("prvDiaForcedSources") or [])

# use nullable integer data type to avoid converting ints to floats
# for columns in one dataframe but not the other
sources_ints = [c for c, v in sources_df.dtypes.items() if v == int]
sources_df = sources_df.astype(
{c: "Int64" for c in set(sources_ints) - set(forced_df.columns)}
)
forced_ints = [c for c, v in forced_df.dtypes.items() if v == int]
forced_df = forced_df.astype({c: "Int64" for c in set(forced_ints) - set(sources_df.columns)})

_dataframe = pd.concat([sources_df, forced_df], ignore_index=True)
return _dataframe


def _convert_flux_to_mag(flux: np.ndarray) -> np.ndarray:
"""Adapted from:
https://github.com/lsst/tutorial-notebooks/blob/044219c9ae5521edcc816af88e4b341e19326dbf/DP0.2/01_Introduction_to_DP02.ipynb#L511

Converts flux [nJy] to AB magnitude.
"""
return -2.5 * np.log10(flux) + 31.4


def _calculate_mag_err(flux: np.ndarray, flux_err: np.ndarray) -> np.ndarray:
"""Calculates magnitude uncertainty."""
return abs(-2.5 / (flux * np.log(10))) * flux_err
14 changes: 14 additions & 0 deletions broker/cloud_run/lsst/classify_vitune/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# As explained here
# https://cloud.google.com/functions/docs/writing/specifying-dependencies-python
# dependencies for a Cloud Function must be specified in a `requirements.txt`
# file (or packaged with the function) in the same directory as `main.py`

google-cloud-logging
pittgoogle-client>=0.3.17

# for Cloud Run
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
# pinned following quickstart example. [TODO] consider un-pinning
Flask
gunicorn
Werkzeug
6 changes: 6 additions & 0 deletions broker/setup_broker/lsst/setup_broker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ bq_table_alerts="alerts_${versiontag}"
bq_table_supernnova="supernnova"
bq_table_upsilon="upsilon"
bq_table_variability="variability"
bq_table_vitune="vitune"
gcs_broker_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}-broker_files")
ps_subscription_reservoir=$(define_GCP_resources "${survey}-alerts-reservoir")
ps_topic_alerts_raw=$(define_GCP_resources "${survey}-alerts_raw")
Expand Down Expand Up @@ -90,6 +91,7 @@ manage_resources() {
(cd templates && bq mk --table --time_partitioning_field=kafkaPublishTimestamp --time_partitioning_type=DAY "${PROJECT_ID}:${bq_dataset}.${bq_table_supernnova}" "bq_${survey}_${bq_table_supernnova}_schema.json") || exit 5
(cd templates && bq mk --table --time_partitioning_field=kafkaPublishTimestamp --time_partitioning_type=DAY "${PROJECT_ID}:${bq_dataset}.${bq_table_variability}" "bq_${survey}_${bq_table_variability}_schema.json") || exit 5
(cd templates && bq mk --table --time_partitioning_field=kafkaPublishTimestamp --time_partitioning_type=DAY "${PROJECT_ID}:${bq_dataset}.${bq_table_upsilon}" "bq_${survey}_${bq_table_upsilon}_schema.json") || exit 5
(cd templates && bq mk --table --time_partitioning_field=kafkaPublishTimestamp --time_partitioning_type=DAY "${PROJECT_ID}:${bq_dataset}.${bq_table_vitune}" "bq_${survey}_${bq_table_vitune}_schema.json") || exit 5
bq update --description "Alert data from LSST. This table is an archive of the lsst-alerts Pub/Sub stream. It has the same schema as the original alert bytes, including nested and repeated fields." "${PROJECT_ID}:${bq_dataset}.${bq_table_alerts}"
bq update --description "Binary classification results from SuperNNova." "${PROJECT_ID}:${bq_dataset}.${bq_table_supernnova}"

Expand Down Expand Up @@ -229,4 +231,8 @@ echo "Configuring Cloud Run services..."
#--- upsilon Cloud Run service
cd .. && cd classify_upsilon
./deploy.sh "${testid}" "${teardown}" "${survey}" "${region}"

#--- vitune Cloud Run service
cd .. && cd classify_vitune
./deploy.sh "${testid}" "${teardown}" "${survey}" "${region}"
) || exit
44 changes: 44 additions & 0 deletions broker/setup_broker/lsst/templates/bq_lsst_vitune_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[
{
"description": "Identifier of the triggering DiaSource.",
"mode": "REQUIRED",
"name": "diaSourceId",
"type": "INTEGER"
},
{
"description": "Unique identifier of this DiaObject.",
"mode": "NULLABLE",
"name": "diaObjectId",
"type": "INTEGER"
},
{
"description": "Unique identifier of the object.",
"mode": "NULLABLE",
"name": "ssObjectId",
"type": "INTEGER"
},
{
"description": "probability object is a type Ia supernova",
"mode": "NULLABLE",
"name": "prob_class0",
"type": "FLOAT"
},
{
"description": "probability object is non-Ia",
"mode": "NULLABLE",
"name": "prob_class1",
"type": "FLOAT"
},
{
"description": "predicted class; 0 = Ia, 1 = non-Ia",
"mode": "NULLABLE",
"name": "predicted_class",
"type": "INTEGER"
},
{
"description": "Kafka timestamp from originating LSST alert.",
"mode": "REQUIRED",
"name": "kafkaPublishTimestamp",
"type": "TIMESTAMP"
}
]