Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions broker/cloud_run/lsst/oracle/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.12-slim

# Allow statements and log messages to immediately appear in the Knative logs
ENV PYTHONUNBUFFERED True

# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./

# Install production dependencies.
RUN pip install --no-cache-dir -r requirements.txt

# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
Binary file added broker/cloud_run/lsst/oracle/best_no_md_model.h5
Binary file not shown.
29 changes: 29 additions & 0 deletions broker/cloud_run/lsst/oracle/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# https://cloud.google.com/build/docs/deploying-builds/deploy-cloud-run
# containerize the module and deploy it to Cloud Run
steps:
# Build the image
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '.']
# Push the image to Artifact Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}']
# Deploy image to Cloud Run
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: gcloud
args: ['run', 'deploy', '${_MODULE_NAME}', '--image', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '--region', '${_REGION}', '--set-env-vars', '${_ENV_VARS}']
images:
- '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}'
substitutions:
_SURVEY: 'lsst'
_TESTID: 'testid'
_MODULE_NAME: '${_SURVEY}-classify_with_ORACLE-${_TESTID}'
_MODULE_IMAGE_NAME: 'gcr.io/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_NAME}'
_REPOSITORY: 'cloud-run-services'
# cloud functions automatically sets the projectid env var using the name "GCP_PROJECT"
# use the same name here for consistency
# [TODO] PROJECT_ID is set in setup.sh. this is confusing and we should revisit the decision.
# i (Raen) think i didn't make it a substitution because i didn't want to set a default for it.
_ENV_VARS: 'GCP_PROJECT=${PROJECT_ID},SURVEY=${_SURVEY},TESTID=${_TESTID}'
_REGION: 'us-central1'
options:
dynamic_substitutions: true
89 changes: 89 additions & 0 deletions broker/cloud_run/lsst/oracle/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#! /bin/bash
# Deploys or deletes broker Cloud Run service
# This script will not delete Cloud Run services that are in production

# "False" uses production resources
# any other string will be appended to the names of all resources
testid="${1:-test}"
# "True" tearsdown/deletes resources, else setup
teardown="${2:-False}"
# name of the survey this broker instance will ingest
survey="${3:-lsst}"
region="${4:-us-central1}"
# get the environment variable
PROJECT_ID=$GOOGLE_CLOUD_PROJECT

MODULE_NAME="oracle" # lower case required by cloud run
ROUTE_RUN="/" # url route that will trigger main.run()

define_GCP_resources() {
local base_name="$1"
local separator="${2:--}"
local testid_suffix=""

if [ "$testid" != "False" ] && [ -n "$testid" ]; then
testid_suffix="${separator}${testid}"
fi
echo "${base_name}${testid_suffix}"
}

#--- GCP resources used in this script
artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services")
bq_dataset=$(define_GCP_resources "${survey}" "_")
bq_table="ORACLE"
cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run
ps_input_subscrip=$(define_GCP_resources "${survey}-${MODULE_NAME}") # pub/sub subscription used to trigger cloud run module
ps_output_topic=$(define_GCP_resources "${survey}-${MODULE_NAME}")
ps_trigger_topic=$(define_GCP_resources "${survey}-lite")
runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com"
# topics and subscriptions involved in writing data to BigQuery
ps_bigquery_subscription=$(define_GCP_resources "${survey}-${MODULE_NAME}-bigquery-import")
ps_deadletter_topic=$(define_GCP_resources "${survey}-deadletter")

if [ "${teardown}" = "True" ]; then
# ensure that we do not teardown production resources
if [ "${testid}" != "False" ]; then
echo
echo "Deleting resources for ${MODULE_NAME} module..."
gcloud pubsub topics delete "${ps_output_topic}"
gcloud pubsub subscriptions delete "${ps_bigquery_subscription}"
gcloud pubsub subscriptions delete "${ps_input_subscrip}"
gcloud run services delete "${cr_module_name}" --region "${region}"
fi
else
echo "Configuring Pub/Sub resources..."
gcloud pubsub topics create "${ps_output_topic}"
gcloud pubsub subscriptions create "${ps_bigquery_subscription}" \
--topic="${ps_output_topic}" \
--bigquery-table="${PROJECT_ID}:${bq_dataset}.${bq_table}" \
--use-table-schema \
--drop-unknown-fields \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5 \
--dead-letter-topic-project="${PROJECT_ID}"
# set IAM policies on public Pub/Sub resources
if [ "$testid" = "False" ]; then
user="allUsers"
roleid="projects/${GOOGLE_CLOUD_PROJECT}/roles/userPublic"
gcloud pubsub topics add-iam-policy-binding "${ps_output_topic}" --member="${user}" --role="${roleid}"
fi

#--- Deploy Cloud Run service
echo
echo "Creating container image for ${MODULE_NAME} module and deploying to Cloud Run..."
moduledir="." # deploys what's in our current directory
config="${moduledir}/cloudbuild.yaml"
url=$(gcloud builds submit --config="${config}" \
--substitutions="_SURVEY=${survey},_TESTID=${testid},_MODULE_NAME=${cr_module_name},_REPOSITORY=${artifact_registry_repo}" \
"${moduledir}" | sed -n 's/^Step #2: Service URL: \(.*\)$/\1/p')
echo
echo "Creating trigger subscription for ${MODULE_NAME} Cloud Run service..."
gcloud pubsub subscriptions create "${ps_input_subscrip}" \
--topic "${ps_trigger_topic}" \
--topic-project "${PROJECT_ID}" \
--ack-deadline=600 \
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}" \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5
fi
152 changes: 152 additions & 0 deletions broker/cloud_run/lsst/oracle/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""Classify an alert using ORACLE (Shah et al. 2025).

This code is intended to be containerized and deployed to Google Cloud Run.
Once deployed, individual alerts in the "trigger" stream will be delivered to the container as HTTP requests.
"""

import os
from pathlib import Path
import flask # Manage the HTTP request containing the alert
import pittgoogle # Manipulate the alert and interact with cloud resources

import pandas as pd
from astroOracle.pretrained_models import ORACLE_lite

import google.cloud.logging

# [FIXME] Make this helpful or else delete it.
# Connect the python logger to the google cloud logger.
# By default, this captures INFO level and above.
# pittgoogle uses the python logger.
# We don't currently use the python logger directly in this script, but we could.
google.cloud.logging.Client().setup_logging()

# These environment variables are defined when running the deploy.sh script.
PROJECT_ID = os.getenv("GCP_PROJECT")
TESTID = os.getenv("TESTID")
SURVEY = os.getenv("SURVEY")

# provenance variables
MODULE_NAME = "ORACLE"
MODULE_VERSION = 0.1

# classifier variables
model_dir_name = "./"
model_file_name = "best_no_md_model.h5"
MODEL_PATH = Path(__file__).resolve().parent / model_dir_name / model_file_name

# Variables for incoming data
# A url route is used in setup.sh when the trigger subscription is created.
# It is possible to define multiple routes in a single module and trigger them using different subscriptions.
ROUTE_RUN = "/" # HTTP route that will trigger run(). Must match setup.sh

# Variables for outgoing data
HTTP_204 = 204 # HTTP code: Success
HTTP_400 = 400 # HTTP code: Bad Request

# GCP resources used in this module
# pittgoogle will construct the full resource names from the MODULE_NAME, SURVEY, and TESTID
TOPIC = pittgoogle.Topic.from_cloud(
MODULE_NAME, survey=SURVEY, testid=TESTID, projectid=PROJECT_ID
)

app = flask.Flask(__name__)

@app.route(ROUTE_RUN, methods=["POST"])
def run():
"""Classify the alert; publish and store results.

This module is intended to be deployed as a Cloud Run service. It will operate as an HTTP endpoint
triggered by Pub/Sub messages. This function will be called once for every message sent to this route.
It should accept the incoming HTTP request and return a response.

Returns
-------
response : tuple(str, int)
Tuple containing the response body (string) and HTTP status code (int). Flask will convert the
tuple into a proper HTTP response. Note that the response is a status message for the web server
and should not contain the classification results.
"""
# extract the envelope from the request that triggered the endpoint
# this contains a single Pub/Sub message with the alert to be processed
envelope = flask.request.get_json()

# unpack the alert. raises a `BadRequest` if the envelope does not contain a valid message
try:
alert_lite = pittgoogle.Alert.from_cloud_run(envelope, "default")
except pittgoogle.exceptions.BadRequest as exc:
return str(exc), HTTP_400

model_lite = ORACLE_lite(MODEL_PATH)
oracle_classification = model_lite.predict([_format_for_classifier(alert_lite)]).to_dict(orient='records')[0]

level_1_class = _most_likely_class(oracle_classification, ['Transient', 'Variable'])
level_2_class = _most_likely_class(oracle_classification, ['SN', 'Fast', 'Long', # Transient
'Periodic', 'AGN']) # Variable
leaf_class = _most_likely_class(oracle_classification, ['SNIa', 'SNIb/c', 'SNIax', 'SNI91bg', 'SNII', # SN
'KN', 'Dwarf Novae', 'uLens', 'M-dwarf Flare', # Fast
'SLSN', 'TDE', 'ILOT', 'CART', 'PISN', # Long
'Cepheid', 'RR Lyrae', 'Delta Scuti', 'EB', # Periodic
'AGN']) # AGN

# publish
outpt_dict = {
"output": oracle_classification,
"predicted_level_1": level_1_class[0],
"predicted_level_1_prob": level_1_class[1],
"predicted_level_2": level_2_class[0],
"predicted_level_2_prob": level_2_class[1],
"predicted_leaf": leaf_class[0],
"predicted_leaf_prob": leaf_class[1]
}

TOPIC.publish(
pittgoogle.Alert.from_dict(
payload={'alert_lite': alert_lite.dict['alert_lite'], 'ORACLE': outpt_dict},
attributes={
**alert_lite.attributes,
'pg_oracle_class': outpt_dict['predicted_leaf'],
},
schema_name='default',
)
)

return "", HTTP_204

def _y_to_Y(band):
if band == 'y':
return 'Y'
return band

# this could use improvement
def _get_photflag(flux, flux_err):
if flux_err > 5*abs(flux):

Check notice on line 125 in broker/cloud_run/lsst/oracle/main.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

broker/cloud_run/lsst/oracle/main.py#L125

Unnecessary "elif" after "return", remove the leading "el" from "elif"
return 1024
elif abs(flux) < max(100, flux_err):
return 0
else:
return 4096

def _format_for_classifier(alert_lite: pittgoogle.Alert) -> pd.DataFrame:
"""Create a DataFrame for input to ORACLE."""
alert_lite_dict = alert_lite.dict['alert_lite']
source_dicts = [alert_lite_dict['diaSource']] + alert_lite_dict['prvDiaSources'] + alert_lite_dict['prvDiaForcedSources']

MJD_min = min([source_dict['midpointMjdTai'] for source_dict in source_dicts])
df = pd.DataFrame({'MJD': [source_dict['midpointMjdTai'] - MJD_min for source_dict in source_dicts], # start dates at 0
'BAND': [_y_to_Y(source_dict['band']) for source_dict in source_dicts], # ORACLE wants Y band to be capital
'FLUXCAL': [source_dict['psfFlux'] for source_dict in source_dicts],
'FLUXCALERR': [source_dict['psfFluxErr'] for source_dict in source_dicts],
'PHOTFLAG': [_get_photflag(source_dict['psfFlux'], source_dict['psfFluxErr']) for source_dict in source_dicts]})
return df.sort_values(by=['MJD'])

def _most_likely_class(probability_dict: dict, keys: list) -> tuple[str, float]:
max_val = 0
max_class = None
for key in keys:
if max_val < probability_dict[key]:
max_val = probability_dict[key]
max_class = key
return (max_class, max_val)
15 changes: 15 additions & 0 deletions broker/cloud_run/lsst/oracle/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# As explained here
# https://cloud.google.com/functions/docs/writing/specifying-dependencies-python
# dependencies for a Cloud Function must be specified in a `requirements.txt`
# file (or packaged with the function) in the same directory as `main.py`

google-cloud-logging
pittgoogle-client>=0.3.14
git+https://github.com/uiucsn/Astro-ORACLE.git

# for Cloud Run
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
# pinned following quickstart example. [TODO] consider un-pinning
Flask
gunicorn
Werkzeug
Loading