|
| 1 | +#!/usr/bin/env python3 |
| 2 | +# -*- coding: UTF-8 -*- |
| 3 | + |
| 4 | +"""This module publishes alert data to various Pub/Sub topics.""" |
| 5 | + |
| 6 | +import base64 |
| 7 | +import os |
| 8 | +import pittgoogle |
| 9 | +from google.cloud import functions_v1, pubsub_v1, logging |
| 10 | + |
| 11 | +PROJECT_ID = os.getenv("GCP_PROJECT") |
| 12 | +SURVEY = os.getenv("SURVEY") |
| 13 | +TESTID = os.getenv("TESTID") |
| 14 | +VERSIONTAG = os.getenv("VERSIONTAG") |
| 15 | + |
| 16 | +# connect to the cloud logger |
| 17 | +log_name = "store-bigquery-cloudfnc" # same log for all broker instances |
| 18 | +logging_client = logging.Client() |
| 19 | +logger = logging_client.logger(log_name) |
| 20 | + |
| 21 | +# GCP resources used in this module |
| 22 | +ALERT_DATA_TOPIC = pittgoogle.Topic.from_cloud( |
| 23 | + "alert-data", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID |
| 24 | +) |
| 25 | +DIASOURCE_TOPIC = pittgoogle.Topic.from_cloud( |
| 26 | + "diasource", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID |
| 27 | +) |
| 28 | + |
| 29 | + |
| 30 | +def run(event: dict, _context: functions_v1.context.Context) -> None: |
| 31 | + """Send alert data to various Pub/Sub topics. |
| 32 | +
|
| 33 | + Args: |
| 34 | + event: Pub/Sub message data and attributes. |
| 35 | + `data` field contains the message data in a base64-encoded string. |
| 36 | + `attributes` field contains the message's custom attributes in a dict. |
| 37 | +
|
| 38 | + context: Metadata describing the Cloud Function's trigging event. |
| 39 | +
|
| 40 | + 'context' is an unused argument in the function that is required |
| 41 | + see https://cloud.google.com/functions/1stgendocs/writing/write-event-driven-functions#background-functions |
| 42 | + """ |
| 43 | + |
| 44 | + # decode the base64-encoded message data |
| 45 | + decoded_data = base64.b64decode(event["data"]) |
| 46 | + |
| 47 | + # create a PubsubMessage-like object with the existing event dictionary |
| 48 | + pubsub_message = pubsub_v1.types.PubsubMessage( |
| 49 | + data=decoded_data, attributes=event.get("attributes", {}) |
| 50 | + ) |
| 51 | + |
| 52 | + # unpack the alert |
| 53 | + alert = pittgoogle.Alert.from_msg(msg=pubsub_message, schema_name="ztf") |
| 54 | + |
| 55 | + # send the alert to BigQuery table |
| 56 | + alert_table = insert_rows_alerts(alert) |
| 57 | + |
| 58 | + # announce what's been done |
| 59 | + ALERT_DATA_TOPIC.publish(_create_outgoing_alert(alert, alert_table)) |
| 60 | + DIASOURCE_TOPIC.publish(_create_outgoing_alert(alert, alert_table)) |
| 61 | + |
| 62 | + |
| 63 | +def _create_outgoing_alert( |
| 64 | + alert: pittgoogle.alert.Alert, table_dict: dict |
| 65 | +) -> pittgoogle.alert.Alert: |
| 66 | + """Create an announcement of the table storage operation to Pub/Sub.""" |
| 67 | + # collect attributes |
| 68 | + attrs = { |
| 69 | + **alert.attributes, |
| 70 | + "alerts_table": table_dict["alerts_table"], |
| 71 | + "alert_type": alert.dict["alert_type"], |
| 72 | + "superevent_id": alert.dict["superevent_id"], |
| 73 | + } |
| 74 | + |
| 75 | + # set empty message body; everything is in the attributes |
| 76 | + msg = {} |
| 77 | + |
| 78 | + # create outgoing alert |
| 79 | + alert_out = pittgoogle.Alert.from_dict( |
| 80 | + payload=msg, attributes=attrs, schema_name="default_schema" |
| 81 | + ) |
| 82 | + |
| 83 | + return alert_out |
0 commit comments