22
22
TABLE = pittgoogle .Table .from_cloud (f"alerts_{ VERSIONTAG } " , survey = SURVEY , testid = TESTID )
23
23
TOPIC = pittgoogle .Topic .from_cloud ("bigquery" , survey = SURVEY , testid = TESTID , projectid = PROJECT_ID )
24
24
25
+
25
26
def run (event : dict , _context : functions_v1 .context .Context ) -> None :
26
27
"""Send alert data to various BigQuery tables.
27
28
@@ -35,25 +36,25 @@ def run(event: dict, _context: functions_v1.context.Context) -> None:
35
36
'context' is an unused argument in the function that is required
36
37
see https://cloud.google.com/functions/1stgendocs/writing/write-event-driven-functions#background-functions
37
38
"""
38
-
39
+
39
40
# decode the base64-encoded message data
40
41
decoded_data = base64 .b64decode (event ["data" ])
41
42
42
43
# create a PubsubMessage-like object with the existing event dictionary
43
44
pubsub_message = pubsub_v1 .types .PubsubMessage (
44
- data = decoded_data ,
45
- attributes = event .get ("attributes" , {})
45
+ data = decoded_data , attributes = event .get ("attributes" , {})
46
46
)
47
47
48
48
# unpack the alert
49
- alert = pittgoogle .Alert .from_msg (msg = pubsub_message ,schema_name = "default_schema" )
50
-
49
+ alert = pittgoogle .Alert .from_msg (msg = pubsub_message , schema_name = "default_schema" )
50
+
51
51
# send the alert to BigQuery table
52
52
alert_table = insert_rows_alerts (alert )
53
-
53
+
54
54
# announce what's been done
55
55
TOPIC .publish (_create_outgoing_alert (alert , alert_table ))
56
56
57
+
57
58
def insert_rows_alerts (alert : pittgoogle .alert .Alert ):
58
59
"""Insert rows into the `alerts` table via the streaming API."""
59
60
# send to bigquery
@@ -69,14 +70,17 @@ def insert_rows_alerts(alert: pittgoogle.alert.Alert):
69
70
70
71
return table_dict
71
72
72
- def _create_outgoing_alert (alert : pittgoogle .alert .Alert , table_dict : dict ) -> pittgoogle .alert .Alert :
73
+
74
+ def _create_outgoing_alert (
75
+ alert : pittgoogle .alert .Alert , table_dict : dict
76
+ ) -> pittgoogle .alert .Alert :
73
77
"""Create an announcement of the table storage operation to Pub/Sub."""
74
78
# collect attributes
75
79
attrs = {
76
80
** alert .attributes ,
77
- "alerts_table" : table_dict [' alerts_table' ],
78
- "alert_type" : alert .dict [' alert_type' ],
79
- "superevent_id" : alert .dict [' superevent_id' ],
81
+ "alerts_table" : table_dict [" alerts_table" ],
82
+ "alert_type" : alert .dict [" alert_type" ],
83
+ "superevent_id" : alert .dict [" superevent_id" ],
80
84
}
81
85
82
86
# set empty message body; everything is in the attributes
@@ -86,5 +90,5 @@ def _create_outgoing_alert(alert: pittgoogle.alert.Alert, table_dict: dict) -> p
86
90
alert_out = pittgoogle .Alert .from_dict (
87
91
payload = msg , attributes = attrs , schema_name = "default_schema"
88
92
)
89
-
90
- return alert_out
93
+
94
+ return alert_out
0 commit comments