-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
create new README.md in
broker/consumer/rubin
- Loading branch information
1 parent
16d7925
commit 5693d52
Showing
1 changed file
with
22 additions
and
331 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,344 +1,35 @@ | ||
# Connect Pitt-Google to the Rubin alert stream testing deployment | ||
# Start the Rubin consumer VM | ||
|
||
December 2021 - Author: Troy Raen | ||
See `Pitt-Google-Broker/broker/setup_broker/rubin/README.md` for setup instructions. | ||
|
||
- [Overview](#overview) | ||
- [Setup](#setup) | ||
- [Ingest the Rubin test stream](#ingest-the-rubin-test-stream) | ||
- [Pull a Pub/Sub message and open it](#pull-a-pubsub-message-and-open-it) | ||
- [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) | ||
|
||
## Overview | ||
|
||
Details and access credentials were sent to us by Eric Bellm via email. | ||
Spencer Nelson provided some additional details specific to our Kafka Connect consumer. | ||
Here are some links they gave us for reference which were used to set this up: | ||
|
||
- [Rubin sample alerts: obtaining the data with Kafka](https://github.com/lsst-dm/sample_alert_info#obtaining-the-data-with-kafka) | ||
- [Rubin Alert Stream Integration Endpoint](https://github.com/lsst-dm/sample_alert_info/blob/main/doc/alert_stream_integration_endpoint.md) | ||
- Schemas are stored at: <https://alert-schemas-int.lsst.cloud/> | ||
- [Using schema registry with Kafka Connect](https://docs.confluent.io/platform/7.0.1/schema-registry/connect.html). | ||
Spencer says, "Our stream uses Avro for the message values, not keys (we | ||
don't set the key to anything in particular), so you probably want the | ||
`value.converter` properties." | ||
- Tools and libraries for VOEvents: | ||
<https://wiki.ivoa.net/twiki/bin/view/IVOA/IvoaVOEvent#Tools_and_Libraries> | ||
- [Rubin example: java console consumer](https://github.com/lsst-dm/sample_alert_info/tree/main/examples/alert_stream_integration_endpoint/java_console_consumer) | ||
|
||
Rubin alert packets will be Avro serialized, but the schema will not be included with the packet. | ||
There are several ways to handle this. | ||
For now, I have simply passed the alert bytes straight through from Kafka to Pub/Sub and deserialized | ||
alerts after pulling from the Pub/Sub stream. | ||
For other methods, see | ||
[Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below. | ||
|
||
Below is the code I used to set up the necessary resources in GCP, ingest the Rubin stream, pull | ||
messages from the resulting Pub/Sub stream and deserialize the alerts. | ||
|
||
## Setup | ||
|
||
The following assumes you have set the environment variables | ||
`GOOGLE_CLOUD_PROJECT` and `GOOGLE_APPLICATION_CREDENTIALS` | ||
to appropriate values for your GCP project and service account credentials, and that | ||
the service account is authenticated to make `gcloud` calls through the project. | ||
You may want to | ||
[activate a service account for `gcloud` calls](https://pitt-broker.readthedocs.io/en/u-tjr-workingnotes/working-notes/troyraen/service-account.html#switch-the-service-account-your-api-calls-use) | ||
or | ||
[set up a GCP project from scratch](https://pitt-broker.readthedocs.io/en/latest/broker/run-a-broker-instance/initial-setup.html#setup-local-environment). | ||
|
||
Clone the repo and cd into the directory: | ||
|
||
```bash | ||
git clone https://github.com/mwvgroup/Pitt-Google-Broker.git | ||
cd Pitt-Google-Broker | ||
``` | ||
|
||
Define variables used below in multiple calls. | ||
The `KAFKA_USERNAME` and `KAFKA_PASSWORD` must be customized | ||
To start the consumer VM: | ||
|
||
```bash | ||
PROJECT_ID="${GOOGLE_CLOUD_PROJECT}" | ||
# For reference, I ran this with: | ||
# PROJECT_ID="avid-heading-329016" # project name: pitt-google-broker-testing | ||
survey="rubin" | ||
broker_bucket="${PROJECT_ID}-${survey}-broker_files" | ||
consumerVM="${survey}-consumer" | ||
firewallrule="tcpport9094" | ||
|
||
# Kafka credentials for the Rubin stream | ||
KAFKA_USERNAME="pittgoogle-idfint" # set to correct username | ||
KAFKA_PASSWORD="" # set to correct password | ||
|
||
PUBSUB_TOPIC="rubin-alerts" | ||
PUBSUB_SUBSCRIPTION="${PUBSUB_TOPIC}" | ||
KAFKA_TOPIC="alerts-simulated" | ||
``` | ||
|
||
Setup resources on Google Cloud Platform. | ||
|
||
```bash | ||
# Create a firewall rule to open port 9094 (only needs to be done once, per project) | ||
gcloud compute firewall-rules create "${firewallrule}" \ | ||
--allow=tcp:9094 \ | ||
--description="Allow incoming traffic on TCP port 9094" \ | ||
--direction=INGRESS \ | ||
--enable-logging | ||
|
||
# Create a Cloud Storage bucket to store the consumer config files | ||
gsutil mb "gs://${broker_bucket}" | ||
|
||
# Upload the install script and config files for the consumer | ||
o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs | ||
gsutil -m -o "$o" cp -r broker/consumer "gs://${broker_bucket}" | ||
|
||
# Create a Pub/Sub topic and subscription for Rubin alerts | ||
gcloud pubsub topics create "${PUBSUB_TOPIC}" | ||
gcloud pubsub subscriptions create "${PUBSUB_SUBSCRIPTION}" --topic="${PUBSUB_TOPIC}" | ||
|
||
# Create a Rubin Consumer VM | ||
testid="mytest" | ||
consumerVM="${survey}-consumer-${testid}" | ||
zone="us-central1-a" | ||
machinetype="e2-standard-2" | ||
installscript="gs://${broker_bucket}/consumer/vm_install.sh" | ||
gcloud compute instances create "${consumerVM}" \ | ||
--zone="${zone}" \ | ||
--machine-type="${machinetype}" \ | ||
--scopes=cloud-platform \ | ||
--metadata=google-logging-enabled=true,startup-script-url="${installscript}" \ | ||
--tags="${firewallrule}" | ||
``` | ||
|
||
## Ingest the Rubin test stream | ||
|
||
### Setup Consumer VM | ||
|
||
```bash | ||
# start the consumer vm and ssh in | ||
gcloud compute instances start "${consumerVM}" | ||
gcloud compute ssh "${consumerVM}" | ||
|
||
# define some variables | ||
brokerdir=/home/broker # user's home dir on this machine | ||
workingdir="${brokerdir}/consumer/rubin" # consumer's working dir on this machine | ||
|
||
# We will also need the variables defined at the top of this document. | ||
# Go back up to the "Setup" section and define the variables given | ||
# in the code block under "Define variables...", in your environment. | ||
``` | ||
|
||
### Test the connection | ||
|
||
#### Check available Kafka topics | ||
|
||
```bash | ||
/bin/kafka-topics \ | ||
--bootstrap-server alert-stream-int.lsst.cloud:9094 \ | ||
--list \ | ||
--command-config "${workingdir}/admin.properties" | ||
# should see output that includes the topic: alerts-simulated | ||
``` | ||
|
||
#### Test the topic connection using the Kafka Console Consumer | ||
|
||
Set Java env variable | ||
|
||
```bash | ||
export JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64" | ||
``` | ||
|
||
Make a file called 'consumer.properties' and fill it with this | ||
(change `KAFKA_PASSWORD` to the appropriate value): | ||
|
||
```bash | ||
security.protocol=SASL_SSL | ||
sasl.mechanism=SCRAM-SHA-512 | ||
|
||
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ | ||
username="pittgoogle-idfint"\ | ||
password="KAFKA_PASSWORD"; | ||
``` | ||
|
||
Run the Kafka console consumer | ||
|
||
```bash | ||
sudo /bin/kafka-avro-console-consumer \ | ||
--bootstrap-server alert-stream-int.lsst.cloud:9094 \ | ||
--group "${KAFKA_USERNAME}-example-javaconsole" \ | ||
--topic "${KAFKA_TOPIC}" \ | ||
--property schema.registry.url=https://alert-schemas-int.lsst.cloud \ | ||
--consumer.config consumer.properties \ | ||
--timeout-ms=60000 | ||
# if successful, you will see a lot of JSON flood the terminal | ||
``` | ||
|
||
### Run the Kafka -> Pub/Sub connector | ||
|
||
Setup: | ||
|
||
```bash | ||
# download the config files from broker_bucket | ||
sudo mkdir "${brokerdir}" | ||
sudo gsutil -m cp -r "gs://${broker_bucket}/consumer" "${brokerdir}" | ||
|
||
# set the password in two of the config files | ||
sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/admin.properties" | ||
sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/psconnect-worker.properties" | ||
|
||
# replace topic and project configs in ps-connector.properties | ||
fconfig="${workingdir}/ps-connector.properties" | ||
sudo sed -i "s/PROJECT_ID/${PROJECT_ID}/g" ${fconfig} | ||
sudo sed -i "s/PUBSUB_TOPIC/${PUBSUB_TOPIC}/g" ${fconfig} | ||
sudo sed -i "s/KAFKA_TOPIC/${KAFKA_TOPIC}/g" ${fconfig} | ||
``` | ||
|
||
Run the connector: | ||
|
||
```bash | ||
mydir="/home/troyraen" # use my dir because don't have permission to write to workingdir | ||
fout_run="${mydir}/run-connector.out" | ||
sudo /bin/connect-standalone \ | ||
${workingdir}/psconnect-worker.properties \ | ||
${workingdir}/ps-connector.properties \ | ||
&> ${fout_run} | ||
``` | ||
|
||
## Pull a Pub/Sub message and open it | ||
|
||
In the future, we should download schemas from the Confluent Schema Registry and store them. | ||
Then for each alert, check the schema version in the Confluent Wire header, and load the schema file using `fastavro`. | ||
See [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below. | ||
|
||
For now, use the schema in the `lsst-alert-packet` library. Install the library: | ||
|
||
```bash | ||
pip install lsst-alert-packet | ||
``` | ||
|
||
Following the deserialization example at | ||
<https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py> | ||
|
||
```python | ||
import io | ||
import fastavro | ||
from google.cloud import pubsub_v1 | ||
from lsst.alert.packet import Schema | ||
|
||
# pull a message | ||
project_id = "avid-heading-329016" | ||
subscription_name = "rubin-alerts" | ||
max_messages = 5 | ||
|
||
subscriber = pubsub_v1.SubscriberClient() | ||
subscription_path = subscriber.subscription_path(project_id, subscription_name) | ||
request = { | ||
"subscription": subscription_path, | ||
"max_messages": max_messages, | ||
} | ||
|
||
response = subscriber.pull(**request) | ||
|
||
# load the schema | ||
latest_schema = Schema.from_file().definition | ||
# Set the VM metadata | ||
KAFKA_TOPIC="alerts-simulated" | ||
PS_TOPIC="${survey}-alerts-${testid}" | ||
gcloud compute instances add-metadata "${consumerVM}" --zone "${zone}" \ | ||
--metadata="PS_TOPIC_FORCE=${PS_TOPIC},KAFKA_TOPIC_FORCE=${KAFKA_TOPIC}" | ||
|
||
# deserialize the alerts. | ||
# This follows the deserialization example at | ||
# https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py | ||
for received_message in response.received_messages: | ||
alert_bytes = received_message.message.data | ||
# header_bytes = alert_bytes[:5] | ||
# schema_version = deserialize_confluent_wire_header(header_bytes) | ||
content_bytes = io.BytesIO(alert_bytes[5:]) | ||
alert_dict = fastavro.schemaless_reader(content_bytes, latest_schema) | ||
alertId = alert_dict['alertId'] | ||
diaSourceId = alert_dict['diaSource']['diaSourceId'] | ||
psFlux = alert_dict['diaSource']['psFlux'] | ||
print(f"alertId: {alertId}, diaSourceId: {diaSourceId}, psFlux: {psFlux}") | ||
# Start the VM | ||
gcloud compute instances start ${consumerVM} --zone ${zone} | ||
# this launches the startup script which configures and starts the | ||
# Kafka -> Pub/Sub connector | ||
``` | ||
|
||
## Alternative methods for handling the schema | ||
|
||
### Download with a `GET` request, and read the alert's schema version from the Confluent Wire header | ||
|
||
In the future, we should download schemas from the Confluent Schema Registry and store them | ||
(assuming we do not use the schema registry directly in the Kafka connector). | ||
Then for each alert, check the schema version in the Confluent Wire header, and load the schema | ||
file using `fastavro`. | ||
|
||
Recommendation from Spencer Nelson: | ||
|
||
> You might want to look at how Rubin's alert database ingester works. It does the same steps of | ||
> deserializing alert packets, but uses the schema registry instead of lsst.alert.packet: | ||
> | ||
> <https://github.com/lsst-dm/alert_database_ingester/blob/main/alertingest/ingester.py#L192-L209> | ||
> <https://github.com/lsst-dm/alert_database_ingester/blob/main/alertingest/schema_registry.py> | ||
Pub/Sub topics can be configured with an Avro schema attached, but it cannot be changed once attached. | ||
We would have to create a new topic for every schema version. | ||
Therefore, I don't think we should do it this way. | ||
|
||
#### Download a schema from the Confluent Schema Registry using a `GET` request | ||
To stop stop the consumer VM: | ||
|
||
```bash | ||
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=$KAFKA_USERNAME:$KAFKA_PASSWORD | ||
SCHEMA_REGISTRY_URL="https://alert-schemas-int.lsst.cloud" | ||
schema_version=1 | ||
fout_rubinschema="rubinschema_v${schema_version}.avsc" | ||
|
||
# get list of schema subjects | ||
curl --silent -X GET -u "${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" "${SCHEMA_REGISTRY_URL}/subjects" | ||
# download a particular schema | ||
curl --silent -X GET -u \ | ||
"${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" \ | ||
"${SCHEMA_REGISTRY_URL}/schemas/ids/${schema_version}" \ | ||
> "${fout_rubinschema}" | ||
``` | ||
|
||
#### Read the alert's schema version from the Confluent Wire header | ||
|
||
The following is copied from | ||
<https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py> | ||
|
||
```python | ||
import struct | ||
|
||
_ConfluentWireFormatHeader = struct.Struct(">bi") | ||
|
||
def deserialize_confluent_wire_header(raw): | ||
"""Parses the byte prefix for Confluent Wire Format-style Kafka messages. | ||
Parameters | ||
---------- | ||
raw : `bytes` | ||
The 5-byte encoded message prefix. | ||
Returns | ||
------- | ||
schema_version : `int` | ||
A version number which indicates the Confluent Schema Registry ID | ||
number of the Avro schema used to encode the message that follows this | ||
header. | ||
""" | ||
_, version = _ConfluentWireFormatHeader.unpack(raw) | ||
return version | ||
|
||
header_bytes = alert_bytes[:5] | ||
schema_version = deserialize_confluent_wire_header(header_bytes) | ||
``` | ||
|
||
### Use the Confluent Schema Registry with the Kafka Connector | ||
|
||
Kafka Connect can use the Confluent Schema Registry directly. | ||
But schemas are stored under subjects and Kafka Connect is picky about how those | ||
subjects are named. | ||
See | ||
<https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy> | ||
**Rubin has set the schema subject name to “alert-packet”**, which does not conform | ||
to any of the name strategies that Kafka Connect uses. | ||
I did not find a workaround for this issue. | ||
Instead, I passed the alert bytes straight through into Pub/Sub and deserialized | ||
them after pulling the messages from Pub/Sub. | ||
|
||
If you want to try this in the future, set the following configs in the connector's psconnect-worker.properties file. | ||
survey="rubin" | ||
testid="mytest" | ||
consumerVM="${survey}-consumer-${testid}" | ||
zone="us-central1-a" | ||
|
||
```bash | ||
value.converter=io.confluent.connect.avro.AvroConverter | ||
value.converter.schema.registry.url=https://alert-schemas-int.lsst.cloud | ||
value.converter.enhanced.avro.schema.support=true | ||
``` | ||
# Stop the VM | ||
gcloud compute instances stop ${consumerVM} --zone ${zone} | ||
``` | ||