Skip to content

Commit

Permalink
Patches for Codacy
Browse files Browse the repository at this point in the history
  • Loading branch information
troyraen committed Mar 9, 2025
1 parent 65969dc commit effd10f
Show file tree
Hide file tree
Showing 25 changed files with 83 additions and 69 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
Expand All @@ -10,4 +11,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
## [0.6.0] - 2021-00-00

### Changed
- Creates a "Working Notes" section in RTD and moves the following directories there: notes/, version_tracking/, troy/ (from the `troy` branch).

- Creates a "Working Notes" section in RTD and moves the following directories there:
notes/, version_tracking/, troy/ (from the `troy` branch).
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@

# Pitt-Google Alert Broker

The Pitt-Google broker is an astronomical alert broker that is being developed for large scale surveys of the night sky, particularly the upcoming [Vera Rubin Observatory's Legacy Survey of Space and Time](https://www.lsst.org/) (LSST).
We currently process and serve the [Zwicky Transient Facility](https://www.ztf.caltech.edu/)'s (ZTF) nightly alert stream.
The Pitt-Google broker is an astronomical alert broker that is being developed
for large scale surveys of the night sky, particularly the upcoming
[Vera Rubin Observatory's Legacy Survey of Space and Time](https://www.lsst.org/) (LSST).
We currently process and serve the [Zwicky Transient Facility](https://www.ztf.caltech.edu/)'s
(ZTF) nightly alert stream.
The broker runs on the [Google Cloud Platform](https://cloud.google.com) (GCP).

Documentation is at [pitt-broker.readthedocs.io](https://pitt-broker.readthedocs.io/).

If you run into issues or need assistance, please [open an Issue](https://github.com/mwvgroup/Pitt-Google-Broker/issues).
If you run into issues or need assistance, please
[open an Issue](https://github.com/mwvgroup/Pitt-Google-Broker/issues).
1 change: 1 addition & 0 deletions broker/broker_utils/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@

Note that the schema maps are stored in a Cloud Storage bucket and are not packaged with this module.
This has the following benefits:

1. Schema maps can be updated easily by uploading new files to the bucket.
2. Broker instances can use different schema maps; each instance loads them from their own `broker_files` bucket.
6 changes: 3 additions & 3 deletions broker/broker_utils/broker_utils/gcp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def pull_pubsub(
# unpack the messages
message_list, ack_ids = [], []
for received_message in response.received_messages:
success = True

if msg_only:
# extract the message bytes and append
Expand All @@ -156,7 +157,7 @@ def pull_pubsub(
success = callback(received_message)

# collect ack_id, if appropriate
if (callback is None) or (success):
if success:
ack_ids.append(received_message.ack_id)

# acknowledge the messages so they will not be sent again
Expand All @@ -169,8 +170,7 @@ def pull_pubsub(

if not return_count:
return message_list
else:
return len(message_list)
return len(message_list)


def streamingPull_pubsub(
Expand Down
3 changes: 1 addition & 2 deletions broker/broker_utils/broker_utils/schema_maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ def _download_schema_map(survey: str, testid: str, schema: str) -> dict:
def _broker_bucket_name(survey, testid):
if testid in ["False", False]:
return f'{PROJECT_ID}-{survey}-broker_files'
else:
return f'{PROJECT_ID}-{survey}-broker_files-{testid}'
return f'{PROJECT_ID}-{survey}-broker_files-{testid}'


def _schema_object_name(survey):
Expand Down
1 change: 1 addition & 0 deletions broker/broker_utils/broker_utils/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def extract_ids(self, alert_dict=None, attrs=None, filename=None, **kwargs):
Attempts to extract IDs from alert_dict, attrs, and filename, in that order.
kwargs are ignored and only provided for the caller's convenience.
"""
ids = _AlertIds()
if alert_dict is not None:
ids = _AlertIds(
get_value("sourceId", alert_dict, self.schema_map),
Expand Down
10 changes: 7 additions & 3 deletions broker/broker_utils/schema_maps/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# Schema Maps

The files in this directory contain mappings between the schema of an individual survey and a PGB-standardized schema that is used within the broker code.
The files in this directory contain mappings between the schema of an individual
survey and a PGB-standardized schema that is used within the broker code.

Note: This directory is __not__ packaged with the `broker_utils` module.
In order to allow broker instances to use unique schema maps, independent of other instances, each instance uploads this directory to its [`broker_files`] Cloud Storage bucket upon setup.
The broker code loads the schema maps from the bucket of the appropriate instance at runtime.
In order to allow broker instances to use unique schema maps, independent of
other instances, each instance uploads this directory to its [`broker_files`]
Cloud Storage bucket upon setup.
The broker code loads the schema maps from the bucket of the appropriate
instance at runtime.
1 change: 1 addition & 0 deletions broker/cloud_functions/ztf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The broker's setup script deploys the cloud function
to the Google Cloud Platform.

For more information on cloud functions, see:

- [Cloud Functions](https://cloud.google.com/functions)
- [Cloud Functions Execution Environment](https://cloud.google.com/functions/docs/concepts/exec)
- [Cloud Pub/Sub Tutorial](https://cloud.google.com/functions/docs/tutorials/pubsub)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# ZTF_DMAM_V19_NoC_SNIa_vs_CC_forFink

This directory contains a trained [SuperNNova](https://github.com/supernnova/SuperNNova) model (vanilla_S_0_CLF_2_R_none_photometry_DF_1.0_N_global_lstm_32x2_0.05_128_True_mean.pt) and related output/log files.
This directory contains a trained [SuperNNova](https://github.com/supernnova/SuperNNova)
model (vanilla_S_0_CLF_2_R_none_photometry_DF_1.0_N_global_lstm_32x2_0.05_128_True_mean.pt)
and related output/log files.
The model was kindly provided to us by Anais Möller.
It was trained on "ZTF sims from the old cadence" (Oct 21, 2021).
Classification performance could benefit from retraining with better data.
Expand Down
2 changes: 1 addition & 1 deletion broker/cloud_functions/ztf/classify_snn/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
model_path = Path(__file__).resolve().parent / f"{model_dir_name}/{model_file_name}"


def run(msg: dict, context) -> None:
def run(msg: dict, _context) -> None:
"""Classify alert with SuperNNova; publish and store results.
Both parameters are required by Cloud Functions, regardless of whether they are used.
Expand Down
2 changes: 1 addition & 1 deletion broker/cloud_functions/ztf/lite/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def semantic_compression(alert_dict, schema_map) -> dict:
return alert_lite


def run(msg: dict, context):
def run(msg: dict, _context):
"""Create a "lite" alert containing the subset of fields necessary for broker and downstream.
Both parameters are required by Cloud Functions, regardless of whether they are used.
Expand Down
20 changes: 13 additions & 7 deletions broker/cloud_functions/ztf/ps_to_gcs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
The following is just my raw notes from when I set this up.
I will clean up this document later.

__Get an alert from PS and figure out how to parse the data/attributes:__
## Get an alert from PS and figure out how to parse the data/attributes

```python
from google.cloud import pubsub_v1
import main as mn
Expand All @@ -21,7 +22,8 @@ data = msg.data # alert packet, bytes
atrs = msg.attributes # dict of custom attributes
```

__test the fnc; fix schema and upload__
## test the fnc; fix schema and upload

```python
filename = f"{atrs['kafka.topic']}_{atrs['kafka.timestamp']}_trial.avro"
# bucket_name = 'ardent-cycling-243415_ztf_alert_avro_bucket'
Expand Down Expand Up @@ -53,8 +55,8 @@ with mn.TempAlertFile(max_size=max_alert_packet_size, mode='w+b') as temp_file:

Logging: [Using the Logging Client Libraries](https://cloud.google.com/logging/docs/reference/libraries)

## Deploy the Cloud Function

__Deploy the Cloud Function__
```bash
cd deploy2cloud_Aug2020/ps-to-gcs/
# topic=troy_test_topic
Expand All @@ -63,7 +65,8 @@ gcloud functions deploy upload_bytes_to_bucket \
--trigger-topic ${topic}
```

__Trigger the cloud function by publishing the msg we pulled earlier__
## Trigger the cloud function by publishing the msg we pulled earlier

```python
publisher = pubsub_v1.PublisherClient()
topic_name = 'troy_test_topic'
Expand All @@ -73,7 +76,8 @@ attrs = {'kafka.topic': msg.attributes['kafka.topic'],
future = publisher.publish(topic_path, data=msg.data, **attrs)
```

__download file from GCS and see if i can open/read it__
## download file from GCS and see if i can open/read it

```python
# download the file
gcs_fname = f"{atrs['kafka.topic']}_{atrs['kafka.timestamp']}_trial.avro"
Expand All @@ -90,7 +94,8 @@ schema, data = _load_Avro(gcs_fname)
# this works
```

__Setup PubSub notifications on GCS bucket__
## Setup PubSub notifications on GCS bucket

- [Using Pub/Sub notifications for Cloud Storage](https://cloud.google.com/storage/docs/reporting-changes#gsutil)

```bash
Expand All @@ -112,7 +117,8 @@ CONFIGURATION_NAME=11 # get from list command above
gsutil notification delete projects/_/buckets/${BUCKET_NAME}/notificationConfigs/${CONFIGURATION_NAME}
```

__count the number of objects in the bucket matching day's topic__
## count the number of objects in the bucket matching day's topic

```bash
gsutil ls gs://ardent-cycling-243415_ztf_alert_avro_bucket/ztf_20201227_programid1_*.avro > dec27.count
wc -l dec27.count
Expand Down
2 changes: 0 additions & 2 deletions broker/cloud_functions/ztf/ps_to_gcs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

class CloudConnectionError(Exception):
"""Error connecting to one or more Google Cloud services"""
pass


class SchemaParsingError(Exception):
"""Error parsing or guessing properties of an alert schema"""
pass
6 changes: 3 additions & 3 deletions broker/cloud_functions/ztf/tag/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def is_pure(alert_dict):
if SURVEY == "decat":
pure = rb

elif SURVEY == "ztf":
else:
nbad = source["nbad"] == 0 # num bad pixels
fwhm = source["fwhm"] <= 5 # Full Width Half Max, SExtractor [pixels]
elong = source["elong"] <= 1.2 # major / minor axis, SExtractor
Expand Down Expand Up @@ -86,7 +86,7 @@ def _is_extragalactic_transient(alert_dict: dict) -> dict:
# Assume the alert should pass the filter:
is_extragalactic_transient = True

elif SURVEY == "ztf":
else:
dflc = data_utils.alert_lite_to_dataframe(alert_dict)

candidate = dflc.loc[0]
Expand Down Expand Up @@ -128,7 +128,7 @@ def _is_extragalactic_transient(alert_dict: dict) -> dict:
return exgalac_dict


def run(msg: dict, context):
def run(msg: dict, _context):
"""Identify basic categorizations; publish results to BigQuery and as Pub/Sub msg attributes.
Both parameters are required by Cloud Functions, regardless of whether they are used.
Expand Down
3 changes: 2 additions & 1 deletion broker/consumer/ztf/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
See [`kafka_console_connect.md`](kafka_console_connect.md)
# Consumer

See [`kafka_console_connect.md`](kafka_console_connect.md)

To start the `ztf-consumer` VM and begin ingesting a ZTF topic:

Expand Down
2 changes: 1 addition & 1 deletion broker/consumer/ztf/vm_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ apt install -y default-jre
apt install -y default-jdk
echo 'JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64/bin/java"' >> /etc/environment
source /etc/environment
echo $JAVA_HOME
echo "$JAVA_HOME"
echo "Done installing Java."
apt update

Expand Down
18 changes: 9 additions & 9 deletions broker/consumer/ztf/vm_startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ do
/bin/kafka-topics \
--bootstrap-server public2.alerts.ztf.uw.edu:9094 \
--list \
--command-config ${workingdir}/admin.properties \
> ${fout_topics}
--command-config "${workingdir}/admin.properties" \
> "${fout_topics}"
} || {
true
}
Expand All @@ -103,7 +103,7 @@ do
fi

# check if our topic is in the list
if grep -Fq "${KAFKA_TOPIC}" $fout_topics
if grep -Fq "${KAFKA_TOPIC}" "$fout_topics"
then
alerts_flowing=true # start consuming
else
Expand All @@ -115,12 +115,12 @@ done
if [ "${USE_AUTHENTICATION}" = true ]
then
/bin/connect-standalone \
${workingdir}/psconnect-worker-authenticated.properties \
${workingdir}/ps-connector.properties \
&>> ${fout_run}
"${workingdir}/psconnect-worker-authenticated.properties" \
"${workingdir}/ps-connector.properties" \
&>> "${fout_run}"
else
/bin/connect-standalone \
${workingdir}/psconnect-worker-unauthenticated.properties \
${workingdir}/ps-connector.properties \
&>> ${fout_run}
"${workingdir}/psconnect-worker-unauthenticated.properties" \
"${workingdir}/ps-connector.properties" \
&>> "${fout_run}"
fi
22 changes: 10 additions & 12 deletions broker/night_conductor/process_pubsub_counters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@


import argparse
from google import api_core
from google.cloud import bigquery, logging
import json
import numpy as np
import pandas as pd
import time
from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from broker_utils import gcp_utils, schema_maps

from google import api_core
from google.cloud import bigquery, logging

project_id = "ardent-cycling-243415"

Expand Down Expand Up @@ -450,19 +449,18 @@ def _dont_collect_extra_fields(self, metadata):
# keep everything
return metadata

else:
keep_keys = self._keep_field_names(self.requested_fields, metadata.keys())
# keep message_id so we can drop duplicates later
keep_keys = keep_keys + ["message_id"]
keep_keys = self._keep_field_names(self.requested_fields, metadata.keys())
# keep message_id so we can drop duplicates later
keep_keys = keep_keys + ["message_id"]

metadata_to_keep = {k: v for k, v in metadata.items() if k in keep_keys}
metadata_to_keep = {k: v for k, v in metadata.items() if k in keep_keys}

return metadata_to_keep
return metadata_to_keep

def _stash_dicts_to_json_file(self):
# for debugging
fname = f"metadata/{self.subscription}.json"
with open(fname, "w") as fout:
with open(fname, "w", encoding="utf-8") as fout:
json.dump(self.metadata_dicts_list, fout, allow_nan=True)

def _package_metadata_into_df(self):
Expand Down
3 changes: 2 additions & 1 deletion broker/setup_broker/ztf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

See the broker-level [../README.md](../README.md) for instructions on setting up a broker instance.

# Helpful Links
## Helpful Links

- Cloud Storage
- [Using Pub/Sub notifications for Cloud Storage](https://cloud.google.com/storage/docs/reporting-changes#gsutil)
- Compute Engine
Expand Down
8 changes: 4 additions & 4 deletions broker/setup_broker/ztf/create_cron_jobs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ else
msgSTART='START'
msgEND='END'

gcloud scheduler jobs create pubsub $night_conductor_START \
gcloud scheduler jobs create pubsub "$night_conductor_START" \
--schedule "${scheduleSTART}" \
--topic $cue_night_conductor \
--topic "$cue_night_conductor" \
--message-body $msgSTART \
--time-zone $timezone \
--location "$region"

gcloud scheduler jobs create pubsub $night_conductor_END \
gcloud scheduler jobs create pubsub "$night_conductor_END" \
--schedule "${scheduleEND}" \
--topic $cue_night_conductor \
--topic "$cue_night_conductor" \
--message-body $msgEND \
--time-zone $timezone \
--location "$region"
Expand Down
2 changes: 1 addition & 1 deletion broker/setup_broker/ztf/setup_broker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ echo "schema_version = ${schema_version}"
echo
echo "Continue? [y/(n)]: "

read continue_with_setup
read -r continue_with_setup
continue_with_setup="${continue_with_setup:-n}"
if [ "$continue_with_setup" != "y" ]; then
echo "Exiting setup."
Expand Down
Loading

0 comments on commit effd10f

Please sign in to comment.