5151"""
5252
5353import logging
54+ import google .cloud .logging
5455import os
5556import re
5657from pathlib import Path
6768 from google .cloud import pubsub , storage
6869 from google .cloud .pubsub_v1 .publisher .futures import Future
6970
70- log = logging .getLogger (__name__ )
71+ log = logging .getLogger (__name__ ) # python root logger
72+ client = google .cloud .logging .Client () # google cloud logger
73+ client .get_default_handler ()
74+ client .setup_logging () # this connects cloud log to root log
7175
7276DEFAULT_ZTF_CONFIG = {
7377 'bootstrap.servers' : 'public2.alerts.ztf.uw.edu:9094' ,
@@ -111,7 +115,7 @@ def seekable(self): # necessary so that fastavro can write to the file
111115
112116
113117def _set_config_defaults (kafka_config : dict ) -> dict :
114- """Set default values for a Kafka configuration dictionary
118+ """Set default values for a Kafka configuration dictionaryk
115119
116120 Default values:
117121 enable.auto.commit: False,
@@ -168,7 +172,7 @@ def __init__(
168172 self .pubsub_alert_data_topic = pubsub_alert_data_topic
169173 self .pubsub_in_GCS_topic = pubsub_in_GCS_topic
170174 self .kafka_server = kafka_config ["bootstrap.servers" ]
171- log .info (f'Initializing consumer: { self .__repr__ ()} ' )
175+ log .debug (f'Initializing consumer: { self .__repr__ ()} ' )
172176
173177 # Connect to Kafka stream
174178 # Enforce NO auto commit, correct log handling
@@ -178,12 +182,12 @@ def __init__(
178182 # Connect to Google Cloud Storage
179183 self .storage_client = storage .Client ()
180184 self .bucket = self .storage_client .get_bucket (bucket_name )
181- log .info (f'Connected to bucket: { self .bucket .name } ' )
185+ log .debug (f'Connected to bucket: { self .bucket .name } ' )
182186
183187 def close (self ) -> None :
184188 """Close down and terminate the Kafka Consumer"""
185189
186- log .info (f'Closing consumer: { self .__repr__ ()} ' )
190+ log .debug (f'Closing consumer: { self .__repr__ ()} ' )
187191 super ().close ()
188192
189193 @staticmethod
@@ -219,9 +223,9 @@ def fix_schema(temp_file: TempAlertFile, survey: str, version: str) -> None:
219223 temp_file .truncate () # removes leftover data
220224 temp_file .seek (0 )
221225
222- log .debug (f'Schema header reformatted for { survey } version { version } ' )
226+ log .info (f'Schema header reformatted for { survey } version { version } ' )
223227
224- def upload_bytes_to_bucket (self , data : bytes , destination_name : str ) -> None :
228+ def upload_bytes_to_bucket (self , data : bytes , destination_name : str ) -> bytes :
225229 """Uploads bytes data to a GCP storage bucket. Prior to storage,
226230 corrects the schema header to be compliant with BigQuery's strict
227231 validation standards if the alert is from a survey version with an
@@ -230,15 +234,19 @@ def upload_bytes_to_bucket(self, data: bytes, destination_name: str) -> None:
230234 Args:
231235 data: Data to upload
232236 destination_name: Name of the file to be created
237+
238+ Returns:
239+ data with a corrected schema header (if one is necessary)
233240 """
234241
235- log .debug (f'Uploading { destination_name } to { self .bucket .name } ' )
242+ log .info (f'Uploading { destination_name } to { self .bucket .name } ' )
236243 blob = self .bucket .blob (destination_name )
237244
238245 # Get the survey name and version
239246 survey = guess_schema_survey (data )
240247 version = guess_schema_version (data )
241248
249+ # Correct the message schema, upload to GCS, and return it
242250 # By default, spool data in memory to avoid IO unless data is too big
243251 # LSST alerts are anticipated at 80 kB, so 150 kB should be plenty
244252 max_alert_packet_size = 150000
@@ -247,15 +255,19 @@ def upload_bytes_to_bucket(self, data: bytes, destination_name: str) -> None:
247255 temp_file .seek (0 )
248256 self .fix_schema (temp_file , survey , version )
249257 blob .upload_from_file (temp_file )
258+ temp_file .seek (0 )
259+ return temp_file .read ()
250260
251261 def run (self ) -> None :
252262 """Ingest kafka Messages to GCS and PubSub"""
253263
254- log .info ('Starting consumer.run ...' )
264+ log .debug ('Starting consumer.run ...' )
255265 try :
256266 while True :
257- msg = self .consume (num_messages = 1 , timeout = 5 )[0 ]
267+ # msg = self.consume(num_messages=1, timeout=1)
268+ msg = self .poll (timeout = 1 )
258269 if msg is None :
270+ log .info ('msg is None' )
259271 continue
260272
261273 if msg .error ():
@@ -266,12 +278,13 @@ def run(self) -> None:
266278
267279 else :
268280 timestamp_kind , timestamp = msg .timestamp ()
269- file_name = f'{ timestamp } .avro'
281+ file_name = f'{ msg . topic () } _ { timestamp } .avro'
270282
271- log .debug (f'Ingesting { file_name } ' )
283+ log .info (f'Ingesting { file_name } ' )
284+ msg_schema_fixed = self .upload_bytes_to_bucket (msg .value (), file_name )
285+ # returns msg.value() bytes object with schema corrected
286+ publish_pubsub (self .pubsub_alert_data_topic , msg_schema_fixed )
272287 publish_pubsub (self .pubsub_in_GCS_topic , file_name .encode ('UTF-8' ))
273- publish_pubsub (self .pubsub_alert_data_topic , msg .value ())
274- self .upload_bytes_to_bucket (msg .value (), file_name )
275288
276289 if not self ._debug :
277290 self .commit ()
0 commit comments