Skip to content

Commit 0965827

Browse files
authored
Merge branch 'develop' into u/wmwv/update_python_312
2 parents 9be7f9b + 50e9f3d commit 0965827

File tree

11 files changed

+829
-336
lines changed

11 files changed

+829
-336
lines changed

broker/consumer/rubin/README.md

Lines changed: 21 additions & 330 deletions
Original file line numberDiff line numberDiff line change
@@ -1,344 +1,35 @@
1-
# Connect Pitt-Google to the Rubin alert stream testing deployment
1+
# Start the Rubin consumer VM
22

3-
December 2021 - Author: Troy Raen
3+
See `Pitt-Google-Broker/broker/setup_broker/rubin/README.md` for setup instructions.
44

5-
- [Overview](#overview)
6-
- [Setup](#setup)
7-
- [Ingest the Rubin test stream](#ingest-the-rubin-test-stream)
8-
- [Pull a Pub/Sub message and open it](#pull-a-pubsub-message-and-open-it)
9-
- [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema)
10-
11-
## Overview
12-
13-
Details and access credentials were sent to us by Eric Bellm via email.
14-
Spencer Nelson provided some additional details specific to our Kafka Connect consumer.
15-
Here are some links they gave us for reference which were used to set this up:
16-
17-
- [Rubin sample alerts: obtaining the data with Kafka](https://github.com/lsst-dm/sample_alert_info#obtaining-the-data-with-kafka)
18-
- [Rubin Alert Stream Integration Endpoint](https://github.com/lsst-dm/sample_alert_info/blob/main/doc/alert_stream_integration_endpoint.md)
19-
- Schemas are stored at: <https://alert-schemas-int.lsst.cloud/>
20-
- [Using schema registry with Kafka Connect](https://docs.confluent.io/platform/7.0.1/schema-registry/connect.html).
21-
Spencer says, "Our stream uses Avro for the message values, not keys (we
22-
don't set the key to anything in particular), so you probably want the
23-
`value.converter` properties."
24-
- Tools and libraries for VOEvents:
25-
<https://wiki.ivoa.net/twiki/bin/view/IVOA/IvoaVOEvent#Tools_and_Libraries>
26-
- [Rubin example: java console consumer](https://github.com/lsst-dm/sample_alert_info/tree/main/examples/alert_stream_integration_endpoint/java_console_consumer)
27-
28-
Rubin alert packets will be Avro serialized, but the schema will not be included with the packet.
29-
There are several ways to handle this.
30-
For now, I have simply passed the alert bytes straight through from Kafka to Pub/Sub and deserialized
31-
alerts after pulling from the Pub/Sub stream.
32-
For other methods, see
33-
[Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below.
34-
35-
Below is the code I used to set up the necessary resources in GCP, ingest the Rubin stream, pull
36-
messages from the resulting Pub/Sub stream and deserialize the alerts.
37-
38-
## Setup
39-
40-
The following assumes you have set the environment variables
41-
`GOOGLE_CLOUD_PROJECT` and `GOOGLE_APPLICATION_CREDENTIALS`
42-
to appropriate values for your GCP project and service account credentials, and that
43-
the service account is authenticated to make `gcloud` calls through the project.
44-
You may want to
45-
[activate a service account for `gcloud` calls](https://pitt-broker.readthedocs.io/en/u-tjr-workingnotes/working-notes/troyraen/service-account.html#switch-the-service-account-your-api-calls-use)
46-
or
47-
[set up a GCP project from scratch](https://pitt-broker.readthedocs.io/en/latest/broker/run-a-broker-instance/initial-setup.html#setup-local-environment).
48-
49-
Clone the repo and cd into the directory:
50-
51-
```bash
52-
git clone https://github.com/mwvgroup/Pitt-Google-Broker.git
53-
cd Pitt-Google-Broker
54-
```
55-
56-
Define variables used below in multiple calls.
57-
The `KAFKA_USERNAME` and `KAFKA_PASSWORD` must be customized
5+
To start the consumer VM:
586

597
```bash
60-
PROJECT_ID="${GOOGLE_CLOUD_PROJECT}"
61-
# For reference, I ran this with:
62-
# PROJECT_ID="avid-heading-329016" # project name: pitt-google-broker-testing
638
survey="rubin"
64-
broker_bucket="${PROJECT_ID}-${survey}-broker_files"
65-
consumerVM="${survey}-consumer"
66-
firewallrule="tcpport9094"
67-
68-
# Kafka credentials for the Rubin stream
69-
KAFKA_USERNAME="pittgoogle-idfint" # set to correct username
70-
KAFKA_PASSWORD="" # set to correct password
71-
72-
PUBSUB_TOPIC="rubin-alerts"
73-
PUBSUB_SUBSCRIPTION="${PUBSUB_TOPIC}"
74-
KAFKA_TOPIC="alerts-simulated"
75-
```
76-
77-
Setup resources on Google Cloud Platform.
78-
79-
```bash
80-
# Create a firewall rule to open port 9094 (only needs to be done once, per project)
81-
gcloud compute firewall-rules create "${firewallrule}" \
82-
--allow=tcp:9094 \
83-
--description="Allow incoming traffic on TCP port 9094" \
84-
--direction=INGRESS \
85-
--enable-logging
86-
87-
# Create a Cloud Storage bucket to store the consumer config files
88-
gsutil mb "gs://${broker_bucket}"
89-
90-
# Upload the install script and config files for the consumer
91-
o="GSUtil:parallel_process_count=1" # disable multiprocessing for Macs
92-
gsutil -m -o "$o" cp -r broker/consumer "gs://${broker_bucket}"
93-
94-
# Create a Pub/Sub topic and subscription for Rubin alerts
95-
gcloud pubsub topics create "${PUBSUB_TOPIC}"
96-
gcloud pubsub subscriptions create "${PUBSUB_SUBSCRIPTION}" --topic="${PUBSUB_TOPIC}"
97-
98-
# Create a Rubin Consumer VM
9+
testid="mytest"
10+
consumerVM="${survey}-consumer-${testid}"
9911
zone="us-central1-a"
100-
machinetype="e2-standard-2"
101-
installscript="gs://${broker_bucket}/consumer/vm_install.sh"
102-
gcloud compute instances create "${consumerVM}" \
103-
--zone="${zone}" \
104-
--machine-type="${machinetype}" \
105-
--scopes=cloud-platform \
106-
--metadata=google-logging-enabled=true,startup-script-url="${installscript}" \
107-
--tags="${firewallrule}"
108-
```
109-
110-
## Ingest the Rubin test stream
111-
112-
### Setup Consumer VM
113-
114-
```bash
115-
# start the consumer vm and ssh in
116-
gcloud compute instances start "${consumerVM}"
117-
gcloud compute ssh "${consumerVM}"
118-
119-
# define some variables
120-
brokerdir=/home/broker # user's home dir on this machine
121-
workingdir="${brokerdir}/consumer/rubin" # consumer's working dir on this machine
122-
123-
# We will also need the variables defined at the top of this document.
124-
# Go back up to the "Setup" section and define the variables given
125-
# in the code block under "Define variables...", in your environment.
126-
```
127-
128-
### Test the connection
129-
130-
#### Check available Kafka topics
131-
132-
```bash
133-
/bin/kafka-topics \
134-
--bootstrap-server alert-stream-int.lsst.cloud:9094 \
135-
--list \
136-
--command-config "${workingdir}/admin.properties"
137-
# should see output that includes the topic: alerts-simulated
138-
```
139-
140-
#### Test the topic connection using the Kafka Console Consumer
141-
142-
Set Java env variable
143-
144-
```bash
145-
export JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64"
146-
```
147-
148-
Make a file called 'consumer.properties' and fill it with this
149-
(change `KAFKA_PASSWORD` to the appropriate value):
150-
151-
```bash
152-
security.protocol=SASL_SSL
153-
sasl.mechanism=SCRAM-SHA-512
154-
155-
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
156-
username="pittgoogle-idfint"\
157-
password="KAFKA_PASSWORD";
158-
```
159-
160-
Run the Kafka console consumer
16112

162-
```bash
163-
sudo /bin/kafka-avro-console-consumer \
164-
--bootstrap-server alert-stream-int.lsst.cloud:9094 \
165-
--group "${KAFKA_USERNAME}-example-javaconsole" \
166-
--topic "${KAFKA_TOPIC}" \
167-
--property schema.registry.url=https://alert-schemas-int.lsst.cloud \
168-
--consumer.config consumer.properties \
169-
--timeout-ms=60000
170-
# if successful, you will see a lot of JSON flood the terminal
171-
```
172-
173-
### Run the Kafka -> Pub/Sub connector
174-
175-
Setup:
176-
177-
```bash
178-
# download the config files from broker_bucket
179-
sudo mkdir "${brokerdir}"
180-
sudo gsutil -m cp -r "gs://${broker_bucket}/consumer" "${brokerdir}"
181-
182-
# set the password in two of the config files
183-
sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/admin.properties"
184-
sudo sed -i "s/KAFKA_PASSWORD/${KAFKA_PASSWORD}/g" "${workingdir}/psconnect-worker.properties"
185-
186-
# replace topic and project configs in ps-connector.properties
187-
fconfig="${workingdir}/ps-connector.properties"
188-
sudo sed -i "s/PROJECT_ID/${PROJECT_ID}/g" ${fconfig}
189-
sudo sed -i "s/PUBSUB_TOPIC/${PUBSUB_TOPIC}/g" ${fconfig}
190-
sudo sed -i "s/KAFKA_TOPIC/${KAFKA_TOPIC}/g" ${fconfig}
191-
```
192-
193-
Run the connector:
194-
195-
```bash
196-
mydir="/home/troyraen" # use my dir because don't have permission to write to workingdir
197-
fout_run="${mydir}/run-connector.out"
198-
sudo /bin/connect-standalone \
199-
${workingdir}/psconnect-worker.properties \
200-
${workingdir}/ps-connector.properties \
201-
&> ${fout_run}
202-
```
203-
204-
## Pull a Pub/Sub message and open it
205-
206-
In the future, we should download schemas from the Confluent Schema Registry and store them.
207-
Then for each alert, check the schema version in the Confluent Wire header, and load the schema file using `fastavro`.
208-
See [Alternative methods for handling the schema](#alternative-methods-for-handling-the-schema) below.
209-
210-
For now, use the schema in the `lsst-alert-packet` library. Install the library:
211-
212-
```bash
213-
pip install lsst-alert-packet
214-
```
215-
216-
Following the deserialization example at
217-
<https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py>
218-
219-
```python
220-
import io
221-
import fastavro
222-
from google.cloud import pubsub_v1
223-
from lsst.alert.packet import Schema
224-
225-
# pull a message
226-
project_id = "avid-heading-329016"
227-
subscription_name = "rubin-alerts"
228-
max_messages = 5
229-
230-
subscriber = pubsub_v1.SubscriberClient()
231-
subscription_path = subscriber.subscription_path(project_id, subscription_name)
232-
request = {
233-
"subscription": subscription_path,
234-
"max_messages": max_messages,
235-
}
236-
237-
response = subscriber.pull(**request)
238-
239-
# load the schema
240-
latest_schema = Schema.from_file().definition
13+
# Set the VM metadata
14+
KAFKA_TOPIC="alerts-simulated"
15+
PS_TOPIC="${survey}-alerts-${testid}"
16+
gcloud compute instances add-metadata "${consumerVM}" --zone "${zone}" \
17+
--metadata="PS_TOPIC_FORCE=${PS_TOPIC},KAFKA_TOPIC_FORCE=${KAFKA_TOPIC}"
24118

242-
# deserialize the alerts.
243-
# This follows the deserialization example at
244-
# https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py
245-
for received_message in response.received_messages:
246-
alert_bytes = received_message.message.data
247-
# header_bytes = alert_bytes[:5]
248-
# schema_version = deserialize_confluent_wire_header(header_bytes)
249-
content_bytes = io.BytesIO(alert_bytes[5:])
250-
alert_dict = fastavro.schemaless_reader(content_bytes, latest_schema)
251-
alertId = alert_dict['alertId']
252-
diaSourceId = alert_dict['diaSource']['diaSourceId']
253-
psFlux = alert_dict['diaSource']['psFlux']
254-
print(f"alertId: {alertId}, diaSourceId: {diaSourceId}, psFlux: {psFlux}")
19+
# Start the VM
20+
gcloud compute instances start ${consumerVM} --zone ${zone}
21+
# this launches the startup script which configures and starts the
22+
# Kafka -> Pub/Sub connector
25523
```
25624

257-
## Alternative methods for handling the schema
258-
259-
### Download with a `GET` request, and read the alert's schema version from the Confluent Wire header
260-
261-
In the future, we should download schemas from the Confluent Schema Registry and store them
262-
(assuming we do not use the schema registry directly in the Kafka connector).
263-
Then for each alert, check the schema version in the Confluent Wire header, and load the schema
264-
file using `fastavro`.
265-
266-
Recommendation from Spencer Nelson:
267-
268-
> You might want to look at how Rubin's alert database ingester works. It does the same steps of
269-
> deserializing alert packets, but uses the schema registry instead of lsst.alert.packet:
270-
>
271-
> <https://github.com/lsst-dm/alert_database_ingester/blob/main/alertingest/ingester.py#L192-L209>
272-
> <https://github.com/lsst-dm/alert_database_ingester/blob/main/alertingest/schema_registry.py>
273-
274-
Pub/Sub topics can be configured with an Avro schema attached, but it cannot be changed once attached.
275-
We would have to create a new topic for every schema version.
276-
Therefore, I don't think we should do it this way.
277-
278-
#### Download a schema from the Confluent Schema Registry using a `GET` request
25+
To stop stop the consumer VM:
27926

28027
```bash
281-
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=$KAFKA_USERNAME:$KAFKA_PASSWORD
282-
SCHEMA_REGISTRY_URL="https://alert-schemas-int.lsst.cloud"
283-
schema_version=1
284-
fout_rubinschema="rubinschema_v${schema_version}.avsc"
285-
286-
# get list of schema subjects
287-
curl --silent -X GET -u "${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" "${SCHEMA_REGISTRY_URL}/subjects"
288-
# download a particular schema
289-
curl --silent -X GET -u \
290-
"${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}" \
291-
"${SCHEMA_REGISTRY_URL}/schemas/ids/${schema_version}" \
292-
> "${fout_rubinschema}"
293-
```
294-
295-
#### Read the alert's schema version from the Confluent Wire header
296-
297-
The following is copied from
298-
<https://github.com/lsst-dm/alert_stream/blob/main/python/lsst/alert/stream/serialization.py>
299-
300-
```python
301-
import struct
302-
303-
_ConfluentWireFormatHeader = struct.Struct(">bi")
304-
305-
def deserialize_confluent_wire_header(raw):
306-
"""Parses the byte prefix for Confluent Wire Format-style Kafka messages.
307-
Parameters
308-
----------
309-
raw : `bytes`
310-
The 5-byte encoded message prefix.
311-
Returns
312-
-------
313-
schema_version : `int`
314-
A version number which indicates the Confluent Schema Registry ID
315-
number of the Avro schema used to encode the message that follows this
316-
header.
317-
"""
318-
_, version = _ConfluentWireFormatHeader.unpack(raw)
319-
return version
320-
321-
header_bytes = alert_bytes[:5]
322-
schema_version = deserialize_confluent_wire_header(header_bytes)
323-
```
324-
325-
### Use the Confluent Schema Registry with the Kafka Connector
326-
327-
Kafka Connect can use the Confluent Schema Registry directly.
328-
But schemas are stored under subjects and Kafka Connect is picky about how those
329-
subjects are named.
330-
See
331-
<https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy>
332-
**Rubin has set the schema subject name to “alert-packet”**, which does not conform
333-
to any of the name strategies that Kafka Connect uses.
334-
I did not find a workaround for this issue.
335-
Instead, I passed the alert bytes straight through into Pub/Sub and deserialized
336-
them after pulling the messages from Pub/Sub.
337-
338-
If you want to try this in the future, set the following configs in the connector's psconnect-worker.properties file.
28+
survey="rubin"
29+
testid="mytest"
30+
consumerVM="${survey}-consumer-${testid}"
31+
zone="us-central1-a"
33932

340-
```bash
341-
value.converter=io.confluent.connect.avro.AvroConverter
342-
value.converter.schema.registry.url=https://alert-schemas-int.lsst.cloud
343-
value.converter.enhanced.avro.schema.support=true
33+
# Stop the VM
34+
gcloud compute instances stop ${consumerVM} --zone ${zone}
34435
```
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
# see https://kafka.apache.org/documentation/#adminclientconfigs
22

3-
bootstrap.servers=alert-stream-int.lsst.cloud:9094
3+
bootstrap.servers=usdf-alert-stream-dev.lsst.cloud:9094
44
sasl.mechanism=SCRAM-SHA-512
5-
sasl.kerberos.service.name=kafka
6-
security.protocol=SASL_SSL
5+
security.protocol=SASL_PLAINTEXT
76
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
87
username="pittgoogle-idfint"\
98
password="KAFKA_PASSWORD";

0 commit comments

Comments
 (0)