Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions broker/cloud_run/ztf/cross_match/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.12-slim

# Allow statements and log messages to immediately appear in the Knative logs
ENV PYTHONUNBUFFERED True

# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./

# Install production dependencies.
RUN pip install --no-cache-dir -r requirements.txt

# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
29 changes: 29 additions & 0 deletions broker/cloud_run/ztf/cross_match/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# https://cloud.google.com/build/docs/deploying-builds/deploy-cloud-run
# containerize the module and deploy it to Cloud Run
steps:
# Build the image
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '.']
# Push the image to Artifact Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}']
# Deploy image to Cloud Run
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: gcloud
args: ['run', 'deploy', '${_MODULE_NAME}', '--image', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}', '--region', '${_REGION}', '--set-env-vars', '${_ENV_VARS}']
images:
- '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_IMAGE_NAME}'
substitutions:
_SURVEY: 'ztf'
_TESTID: 'testid'
_MODULE_NAME: '${_SURVEY}-euclid-crossmatch-${_TESTID}'
_MODULE_IMAGE_NAME: 'gcr.io/${PROJECT_ID}/${_REPOSITORY}/${_MODULE_NAME}'
_REPOSITORY: 'cloud-run-services'
# cloud functions automatically sets the projectid env var using the name "GCP_PROJECT"
# use the same name here for consistency
# [TODO] PROJECT_ID is set in setup.sh. this is confusing and we should revisit the decision.
# i (Raen) think i didn't make it a substitution because i didn't want to set a default for it.
_ENV_VARS: 'GCP_PROJECT=${PROJECT_ID},SURVEY=${_SURVEY},TESTID=${_TESTID}'
_REGION: 'us-central1'
options:
dynamic_substitutions: true
89 changes: 89 additions & 0 deletions broker/cloud_run/ztf/cross_match/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#! /bin/bash
# Deploys or deletes broker Cloud Run service
# This script will not delete Cloud Run services that are in production

# "False" uses production resources
# any other string will be appended to the names of all resources
testid="${1:-test}"
# "True" tearsdown/deletes resources, else setup
teardown="${2:-False}"
# name of the survey this broker instance will ingest
survey="${3:-ztf}"
region="${4:-us-central1}"
# get the environment variable
PROJECT_ID=$GOOGLE_CLOUD_PROJECT

MODULE_NAME="euclid-crossmatch" # lower case required by cloud run
ROUTE_RUN="/" # url route that will trigger main.run()

define_GCP_resources() {
local base_name="$1"
local separator="${2:--}"
local testid_suffix=""

if [ "$testid" != "False" ] && [ -n "$testid" ]; then
testid_suffix="${separator}${testid}"
fi
echo "${base_name}${testid_suffix}"
}

#--- GCP resources used in this script
artifact_registry_repo=$(define_GCP_resources "${survey}-cloud-run-services")
bq_dataset=$(define_GCP_resources "${survey}" "_")
bq_table="euclid_crossmatch"
cr_module_name=$(define_GCP_resources "${survey}-${MODULE_NAME}") # lower case required by cloud run
ps_input_subscrip=$(define_GCP_resources "${survey}-${MODULE_NAME}") # pub/sub subscription used to trigger cloud run module
ps_output_topic=$(define_GCP_resources "${survey}-${MODULE_NAME}")
ps_trigger_topic=$(define_GCP_resources "${survey}-hostless-transients")
runinvoker_svcact="cloud-run-invoker@${PROJECT_ID}.iam.gserviceaccount.com"
# topics and subscriptions involved in writing data to BigQuery
ps_bigquery_subscription=$(define_GCP_resources "${survey}-${MODULE_NAME}-bigquery-import")
ps_deadletter_topic=$(define_GCP_resources "${survey}-deadletter")

if [ "${teardown}" = "True" ]; then
# ensure that we do not teardown production resources
if [ "${testid}" != "False" ]; then
echo
echo "Deleting resources for ${MODULE_NAME} module..."
gcloud pubsub topics delete "${ps_output_topic}"
gcloud pubsub subscriptions delete "${ps_bigquery_subscription}"
gcloud pubsub subscriptions delete "${ps_input_subscrip}"
gcloud run services delete "${cr_module_name}" --region "${region}"
fi
else
echo "Configuring Pub/Sub resources..."
gcloud pubsub topics create "${ps_output_topic}"
gcloud pubsub subscriptions create "${ps_bigquery_subscription}" \
--topic="${ps_output_topic}" \
--bigquery-table="${PROJECT_ID}:${bq_dataset}.${bq_table}" \
--use-table-schema \
--drop-unknown-fields \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5 \
--dead-letter-topic-project="${PROJECT_ID}"
# set IAM policies on public Pub/Sub resources
if [ "$testid" = "False" ]; then
user="allUsers"
roleid="projects/${GOOGLE_CLOUD_PROJECT}/roles/userPublic"
gcloud pubsub topics add-iam-policy-binding "${ps_output_topic}" --member="${user}" --role="${roleid}"
fi

#--- Deploy Cloud Run service
echo
echo "Creating container image for ${MODULE_NAME} module and deploying to Cloud Run..."
moduledir="." # assumes deploying what's in our current directory
config="${moduledir}/cloudbuild.yaml"
url=$(gcloud builds submit --config="${config}" \
--substitutions="_SURVEY=${survey},_TESTID=${testid},_MODULE_NAME=${cr_module_name},_REPOSITORY=${artifact_registry_repo}" \
"${moduledir}" | sed -n 's/^Step #2: Service URL: \(.*\)$/\1/p')
echo
echo "Creating trigger subscription for ${MODULE_NAME} Cloud Run service..."
gcloud pubsub subscriptions create "${ps_input_subscrip}" \
--topic "${ps_trigger_topic}" \
--topic-project "${PROJECT_ID}" \
--ack-deadline=600 \
--push-endpoint="${url}${ROUTE_RUN}" \
--push-auth-service-account="${runinvoker_svcact}" \
--dead-letter-topic="${ps_deadletter_topic}" \
--max-delivery-attempts=5
fi
195 changes: 195 additions & 0 deletions broker/cloud_run/ztf/cross_match/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

"""
This module uses the value added alert stream ztf-hostless-transients to crossmatch against objects in the Euclid Quick
Data Release 1.
"""

import os
import io
from typing import Dict
from astropy.io import fits
from astropy.stats import sigma_clip
import numpy as np
from google.cloud import logging
import flask
import pittgoogle

# [FIXME] Make this helpful or else delete it.
# Connect the python logger to the google cloud logger.
# By default, this captures INFO level and above.
# pittgoogle uses the python logger.
# We don't currently use the python logger directly in this script, but we could.
logging.Client().setup_logging()

PROJECT_ID = os.getenv("GCP_PROJECT")
TESTID = os.getenv("TESTID")
SURVEY = os.getenv("SURVEY")

# Variables for incoming data
# A url route is used in setup.sh when the trigger subscription is created.
# It is possible to define multiple routes in a single module and trigger them using different subscriptions.
ROUTE_RUN = "/" # HTTP route that will trigger run(). Must match deploy.sh

# Variables for outgoing data
HTTP_204 = 204 # HTTP code: Success
HTTP_400 = 400 # HTTP code: Bad Request

# GCP resources used in this module
TOPIC = pittgoogle.Topic.from_cloud(
"hostless-transients", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID
)

app = flask.Flask(__name__)


@app.route(ROUTE_RUN, methods=["POST"])
def run():
"""Produces a value-added alert stream (${survey}-hostless-transients) identifying hostless transient candidates
Messages in this stream retain fields from the original alert-lite packet.

This module is intended to be deployed as a Cloud Run service. It will operate as an HTTP endpoint
triggered by Pub/Sub messages. This function will be called once for every message sent to this route.
It should accept the incoming HTTP request and return a response.

Returns
-------
response : tuple(str, int)
Tuple containing the response body (string) and HTTP status code (int). Flask will convert the
tuple into a proper HTTP response. Note that the response is a status message for the web server.
"""
# extract the envelope from the request that triggered the endpoint
# this contains a single Pub/Sub message with the alert to be processed
envelope = flask.request.get_json()
try:
alert_lite = pittgoogle.Alert.from_cloud_run(envelope, schema_name="default")
except pittgoogle.exceptions.BadRequest as exc:
return str(exc), HTTP_400

configs = {
"sigma_clipping_kwargs": {"sigma": 3, "maxiters": 10},
"hostless_detection_with_clipping": {
"crop_radius": 7,
"max_number_of_pixels_clipped": 5,
"min_number_of_pixels_clipped": 3,
},
}
hostless_dict = {
"likely_extragalactic_transient": int(alert_lite.attributes["is_extragalactic_transient"]),
"likely_hostless_transient": 0,
}

alert_lite_dict = alert_lite.dict["alert_lite"]
if _is_candidate(alert_lite, configs):
hostless_dict["likely_hostless_transient"] = 1

# publish results to Pub/Sub
TOPIC.publish(
pittgoogle.Alert.from_dict(
{"alert_lite": alert_lite_dict},
attributes={
**alert_lite.attributes,
**{
"pg_likely_hostless_transient": int(hostless_dict["likely_hostless_transient"])
},
},
schema_name="default",
)
)
return "", HTTP_204


def _is_candidate(alert_lite: pittgoogle.Alert, configs: Dict) -> bool:
cutouts = ["Template", "Science"]
template_stamp, science_stamp = [alert_lite.dict.get(f"cutout{cutout}") for cutout in cutouts]
# apply sigma clipping to the bytes data for each stamp
template_stamp_clipped = sigma_clip(
_read_stamp_data(template_stamp), **configs["sigma_clipping_kwargs"]
)
science_stamp_clipped = sigma_clip(
_read_stamp_data(science_stamp), **configs["sigma_clipping_kwargs"]
)

return _run_hostless_detection_with_clipped_data(
science_stamp_clipped, template_stamp_clipped, configs
)


def _read_stamp_data(cutout):
hdul = fits.open(io.BytesIO(cutout))

return hdul[0].data


def _run_hostless_detection_with_clipped_data(
science_stamp: np.ndarray, template_stamp: np.ndarray, configs: Dict
) -> bool:
"""Adapted from:
https://github.com/COINtoolbox/extragalactic_hostless/blob/main/src/pipeline_utils.py#L271

Detects potential hostless candidates with sigma clipped stamp images by cropping an image patch from the center of
the image. If pixels are rejected in scientific image but not in corresponding template image, such candidates are
flagged as potential hostless.
"""

science_clipped = sigma_clip(science_stamp, **configs["sigma_clipping_kwargs"])
template_clipped = sigma_clip(template_stamp, **configs["sigma_clipping_kwargs"])
is_hostless_candidate = _check_hostless_conditions(
science_clipped, template_clipped, configs["hostless_detection_with_clipping"]
)

if is_hostless_candidate:
return is_hostless_candidate
science_stamp = _crop_center_patch(
science_stamp, configs["hostless_detection_with_clipping"]["crop_radius"]
)
template_stamp = _crop_center_patch(
template_stamp, configs["hostless_detection_with_clipping"]["crop_radius"]
)
science_clipped = sigma_clip(science_stamp, **configs["sigma_clipping_kwargs"])
template_clipped = sigma_clip(template_stamp, **configs["sigma_clipping_kwargs"])
is_hostless_candidate = _check_hostless_conditions(
science_clipped, template_clipped, configs["hostless_detection_with_clipping"]
)

return is_hostless_candidate


def _crop_center_patch(input_image: np.ndarray, patch_radius: int = 12) -> np.ndarray:
"""Adapted from:
https://github.com/COINtoolbox/extragalactic_hostless/blob/main/src/pipeline_utils.py#L234

Crops rectangular patch around image center with a given patch scale.
"""
image_shape = input_image.shape[0:2]
center_coords = [image_shape[0] / 2, image_shape[1] / 2]
center_patch_x = int(center_coords[0] - patch_radius)
center_patch_y = int(center_coords[1] - patch_radius)

return input_image[
center_patch_x : center_patch_x + patch_radius * 2,
center_patch_y : center_patch_y + patch_radius * 2,
]


def _check_hostless_conditions(
science_clipped: np.ndarray, template_clipped: np.ndarray, detection_config: Dict
) -> bool:
"""Adapted from:
https://github.com/COINtoolbox/extragalactic_hostless/blob/main/src/pipeline_utils.py#L253
"""

science_only_detection = (
np.ma.count_masked(science_clipped) > detection_config["max_number_of_pixels_clipped"]
and np.ma.count_masked(template_clipped) < detection_config["min_number_of_pixels_clipped"]
)
template_only_detection = (
np.ma.count_masked(template_clipped) > detection_config["max_number_of_pixels_clipped"]
and np.ma.count_masked(science_clipped) < detection_config["min_number_of_pixels_clipped"]
)

if science_only_detection or template_only_detection:
return True

return False
13 changes: 13 additions & 0 deletions broker/cloud_run/ztf/cross_match/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# As explained here
# https://cloud.google.com/functions/docs/writing/specifying-dependencies-python
# dependencies for a Cloud Function must be specified in a `requirements.txt`
# file (or packaged with the function) in the same directory as `main.py`

google-cloud-logging
pittgoogle-client>=0.3.15

# for Cloud Run
# https://cloud.google.com/run/docs/quickstarts/build-and-deploy/deploy-python-service
Flask
gunicorn
Werkzeug
21 changes: 21 additions & 0 deletions broker/cloud_run/ztf/hostless_transients/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.12-slim

# Allow statements and log messages to immediately appear in the Knative logs
ENV PYTHONUNBUFFERED True

# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./

# Install production dependencies.
RUN pip install --no-cache-dir -r requirements.txt

# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
Loading