Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions broker/cloud_run/lsst/scone/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.12-slim

# Allow statements and log messages to immediately appear in the Knative logs
ENV PYTHONUNBUFFERED True

# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./

# Install production dependencies.
RUN pip install --no-cache-dir -r requirements.txt

# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
83 changes: 83 additions & 0 deletions broker/cloud_run/lsst/scone/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# base program for create_heatmaps

import numpy as np
from helpers import (
build_gp,
get_extinction,
get_band_to_wave
)


SIMTAG_Ia = "Ia" # from GENTYPE_TO_CLASS dict in sim readme
SIMTAG_nonIa = "nonIa"


class CreateHeatmaps():
def __init__(self, metadata, lcdata):

Check notice on line 16 in broker/cloud_run/lsst/scone/base.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

broker/cloud_run/lsst/scone/base.py#L16

Useless return at end of function or method
self.lcdata = lcdata

# heatmap parameters
self.wavelength_bins = metadata['wavelength_bins']
self.mjd_bins = metadata['mjd_bins']

self.mwebv = metadata['mwebv']
self.band_to_wave = get_band_to_wave(metadata['survey'])
return

def create_heatmaps(self):
mjd_range = self._get_sn_data()
# gets mean wavelentgth of each observation
wave = [self.band_to_wave[elem] for elem in self.lcdata['passband']]
# creates grid from data
gp = build_gp(20, self.lcdata, wave)

predictions, prediction_errs = self._get_predictions_heatmap(gp, mjd_range)

return np.dstack((prediction_errs, predictions))

# ================================================
# HELPER FUNCTIONS
# ================================================

def _get_sn_data(self):
if len(self.lcdata) == 0 or np.all(self.lcdata['mjd'] < 0):
print("sn lcdata empty")
return None

expected_filters = list(self.band_to_wave.keys())
lclen = len(self.lcdata)
self.lcdata = self.lcdata[np.isin(self.lcdata['passband'], expected_filters)]
if lclen != len(self.lcdata):
print("missing filters")
if len(self.lcdata) == 0:
print("expected filters filtering not working")
return None

mjd_range = [np.min(self.lcdata['mjd']), np.max(self.lcdata['mjd'])]
if not mjd_range:
print("mjd range is none")
return None

# extend light curve to include very early & late epoch with zero flux.
# Beware to pass flux_err > 0 to avoid divide-by-zero in build_gp.
mjd_early = min(self.lcdata['mjd']) - 100
mjd_late = max(self.lcdata['mjd']) + 100
flux = 0.0; flux_err = 0.1; band = expected_filters[2]
self.lcdata.add_row( [mjd_early, flux, flux_err, band] )
self.lcdata.add_row( [mjd_late, flux, flux_err, band] )

return mjd_range

def _get_predictions_heatmap(self, gp, mjd_range):
times = np.linspace(mjd_range[0], mjd_range[1], self.mjd_bins)

wavelengths = np.linspace(3000.0, 10100.0, self.wavelength_bins)
ext = get_extinction(self.mwebv, wavelengths)
ext = np.tile(np.expand_dims(ext, axis=1), len(times))
time_wavelength_grid = np.transpose([np.tile(times, len(wavelengths)), np.repeat(wavelengths, len(times))])

predictions, prediction_vars = gp(time_wavelength_grid, return_var=True)
ext_corrected_predictions = np.array(predictions).reshape(self.wavelength_bins, self.mjd_bins) + ext
prediction_uncertainties = np.sqrt(prediction_vars).reshape(self.wavelength_bins, self.mjd_bins)

return ext_corrected_predictions, prediction_uncertainties
29 changes: 29 additions & 0 deletions broker/cloud_run/lsst/scone/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# https://cloud.google.com/build/docs/deploying-builds/deploy-cloud-run
# containerize the module and deploy it to Cloud Run
steps:
# Build the image
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '.']
# Push the image to Artifact Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}']
# Deploy image to Cloud Run
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: gcloud
args: ['run', 'deploy', '${_MODULE_NAME}', '--image', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '--region', '${_REGION}', '--set-env-vars', '${_ENV_VARS}']
images:
- '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}'
substitutions:
_SURVEY: 'lsst'
_TESTID: 'testid'
_MODULE_NAME: '${_SURVEY}-classify_with_SCONE-${_TESTID}'
_MODULE_IMAGE_NAME: 'gcr.io/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_NAME}'
_REPOSITORY: 'cloud-run-services'
# cloud functions automatically sets the projectid env var using the name "GCP_PROJECT"
# use the same name here for consistency
# [TODO] PROJECT_ID is set in setup.sh. this is confusing and we should revisit the decision.
# i (Raen) think i didn't make it a substitution because i didn't want to set a default for it.
_ENV_VARS: 'GCP_PROJECT=${PROJECT_ID},SURVEY=${_SURVEY},TESTID=${_TESTID}'
_REGION: 'us-central1'
options:
dynamic_substitutions: true
147 changes: 147 additions & 0 deletions broker/cloud_run/lsst/scone/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#! /bin/bash
# Deploys or deletes broker Cloud Run service
# This script will not delete a Cloud Run service that is in production

# "False" uses production resources
# any other string will be appended to the names of all resources
testid="${1:-test}"
# "True" tearsdown/deletes resources, else setup
teardown="${2:-False}"
# name of the survey this broker instance will ingest
survey="${3:-lsst}"
region="${4:-us-central1}"
# get environment variables
PROJECT_ID=$GOOGLE_CLOUD_PROJECT
PROJECT_NUMBER=$(gcloud projects describe "$PROJECT_ID" --format="value(projectNumber)")

MODULE_NAME="scone" # lower case required by cloud run
ROUTE_RUN="/" # url route that will trigger main.run()

# function used to define GCP resources; appends testid if needed
define_GCP_resources() {
local base_name="$1"
local testid_suffix=""

if [ "$testid" != "False" ]; then
if [ "$base_name" = "$survey" ]; then
testid_suffix="_${testid}" # complies with BigQuery naming conventions
else
testid_suffix="-${testid}"
fi
fi

echo "${base_name}${testid_suffix}"
}

#--- GCP resources used in this script
artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services")
deadletter_topic_bigquery_import_scone=$(define_GCP_resources "${survey}-bigquery-import-SCONE-deadletter")
deadletter_topic_bigquery_import_classifications=$(define_GCP_resources "${survey}-bigquery-import-classifications-deadletter")
deadletter_subscription_bigquery_import_scone="${deadletter_topic_bigquery_import_scone}"
deadletter_subscription_bigquery_import_classifications="${deadletter_topic_bigquery_import_classifications}"
topic_bigquery_import_scone=$(define_GCP_resources "${survey}-bigquery-import-SCONE")
topic_bigquery_import_classifications=$(define_GCP_resources "${survey}-bigquery-import-classifications")
subscription_bigquery_import_scone="${topic_bigquery_import_scone}" # BigQuery subscription
subscription_bigquery_import_classifications="${topic_bigquery_import_classifications}" # BigQuery subscription
trigger_topic=$(define_GCP_resources "${survey}-alerts")
ps_input_subscrip="${trigger_topic}" # pub/sub subscription used to trigger cloud run module
ps_output_topic=$(define_GCP_resources "${survey}-SCONE")

# additional GCP resources & variables used in this script
bq_dataset=$(define_GCP_resources "${survey}")
scone_table="SCONE"
classifications_table="classifications"
cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by Cloud Run
runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com"

if [ "${teardown}" = "True" ]; then
# ensure that we do not teardown production resources
if [ "${testid}" != "False" ]; then
gcloud pubsub topics delete "${ps_output_topic}"
gcloud pubsub topics delete "${topic_bigquery_import_scone}"
gcloud pubsub topics delete "${topic_bigquery_import_classifications}"
gcloud pubsub topics delete "${deadletter_topic_bigquery_import_scone}"
gcloud pubsub topics delete "${deadletter_topic_bigquery_import_classifications}"
gcloud pubsub subscriptions delete "${ps_input_subscrip}"
gcloud pubsub subscriptions delete "${subscription_bigquery_import_scone}"
gcloud pubsub subscriptions delete "${subscription_bigquery_import_classifications}"
gcloud pubsub subscriptions delete "${deadletter_subscription_bigquery_import_scone}"
gcloud pubsub subscriptions delete "${deadletter_subscription_bigquery_import_classifications}"
gcloud run services delete "${cr_module_name}" --region "${region}"
fi

else # Deploy the Cloud Run service

#--- Deploy Cloud Run service
echo "Configuring Pub/Sub resources for classify_scone Cloud Run service..."
gcloud pubsub topics create "${ps_output_topic}"
gcloud pubsub topics create "${deadletter_topic_bigquery_import_scone}"
gcloud pubsub topics create "${deadletter_topic_bigquery_import_classifications}"
gcloud pubsub topics create "${topic_bigquery_import_scone}"
gcloud pubsub topics create "${topic_bigquery_import_classifications}"
gcloud pubsub subscriptions create "${deadletter_subscription_bigquery_import_scone}" --topic="${deadletter_topic_bigquery_import_scone}"
gcloud pubsub subscriptions create "${deadletter_subscription_bigquery_import_classifications}" --topic="${deadletter_topic_bigquery_import_classifications}"

# in order to create BigQuery subscriptions, ensure that the following service account:
# service-<project number>@gcp-sa-pubsub.iam.gserviceaccount.com" has the
# bigquery.dataEditor role for each table
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
roleid="roles/bigquery.dataEditor"
bq add-iam-policy-binding \
--member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}" \
--role="${roleid}" \
--table=true "${PROJECT_ID}:${bq_dataset}.${scone_table}"
gcloud pubsub subscriptions create "${subscription_bigquery_import_scone}" \
--topic="${topic_bigquery_import_scone}" \
--bigquery-table="${PROJECT_ID}:${bq_dataset}.${scone_table}" \
--use-table-schema \
--dead-letter-topic="${deadletter_topic_bigquery_import_scone}" \
--max-delivery-attempts=5 \
--dead-letter-topic-project="${PROJECT_ID}"
gcloud pubsub subscriptions create "${subscription_bigquery_import_classifications}" \
--topic="${topic_bigquery_import_classifications}" \
--bigquery-table="${PROJECT_ID}:${bq_dataset}.${classifications_table}" \
--use-table-schema \
--dead-letter-topic="${deadletter_topic_bigquery_import_classifications}" \
--max-delivery-attempts=5 \
--dead-letter-topic-project="${PROJECT_ID}"

# this allows dead-lettered messages to be forwarded from the BigQuery subscription to the dead letter topic
# and it allows dead-lettered messages to be published to the dead letter topic.
gcloud pubsub topics add-iam-policy-binding "${deadletter_topic_bigquery_import_scone}" \
--member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
--role="roles/pubsub.publisher"
gcloud pubsub subscriptions add-iam-policy-binding "${subscription_bigquery_import_scone}" \
--member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
--role="roles/pubsub.subscriber"
gcloud pubsub topics add-iam-policy-binding "${deadletter_topic_bigquery_import_classifications}" \
--member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
--role="roles/pubsub.publisher"
gcloud pubsub subscriptions add-iam-policy-binding "${subscription_bigquery_import_classifications}" \
--member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
--role="roles/pubsub.subscriber"

echo "Creating container image and deploying to Cloud Run..."
moduledir="." # deploys what's in our current directory
config="${moduledir}/cloudbuild.yaml"
url=$(gcloud builds submit --config="${config}" \
--substitutions="_SURVEY=${survey},_TESTID=${testid},_MODULE_NAME=${cr_module_name},_REPOSITORY=${artifact_registry_repo}" \
--region="${region}" \
"${moduledir}" | sed -n 's/^Step #2: Service URL: \(.*\)$/\1/p')

# ensure the Cloud Run service has the necessary permisions
role="roles/run.invoker"
gcloud run services add-iam-policy-binding "${cr_module_name}" \
--member="serviceAccount:${runinvoker_svcact}" \
--role="${role}"

echo "Creating trigger subscription for Cloud Run..."
# WARNING: This is set to retry failed deliveries. If there is a bug in main.py this will
# retry indefinitely, until the message is delete manually.
gcloud pubsub subscriptions create "${ps_input_subscrip}" \
--topic "${trigger_topic}" \
--topic-project "${PROJECT_ID}" \
--ack-deadline=600 \
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}"
fi
Loading