Skip to content

Commit

Permalink
Specify optional docker-compose variables, ensure kafka is ready (#40)
Browse files Browse the repository at this point in the history
Need to explicitly tell docker-compose that we want these variables
provided to the container. If they exist in the host environment,
they'll be passed to the container. If not, they won't be.
This ensures that the environment variables are correctly picked
up by the python app if set.

Set dependencies on docker-compose services to start up in the right order

Ensure that kafka is ready to receive events from producers before
kicking off the connector. Otherwise kafka may be in a transient state
that errors out.

Run CI on all commits and allow manual runs
  • Loading branch information
polastre authored Apr 13, 2022
1 parent 7292d1b commit 9899a49
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 17 deletions.
11 changes: 7 additions & 4 deletions .github/workflows/dockerimage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Docker Image CI

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
branches: '**'
workflow_dispatch:
# Allow manually triggered builds too.

env:
KEEPALIVE: 60
Expand All @@ -22,6 +22,10 @@ jobs:
node-version: [12.16.3]
python-version: [3.9]
steps:
- name: Add secrets mask
run: |
echo "::add-mask::${{ secrets.FH_USERNAME }}"
echo "::add-mask::${{ secrets.FH_APIKEY }}"
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
Expand Down Expand Up @@ -75,4 +79,3 @@ jobs:
run: |
docker-compose up -d
./ci_performance_regression_test.sh
2 changes: 2 additions & 0 deletions ci_performance_regression_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ echo "Positions Count: ${positions_count}"

if [[ $flights_count -lt 45000 ]]; then
echo "Flight count lower than threshold 45000"
docker-compose logs
exit 1
fi

if [[ $positions_count -lt 200000 ]]; then
echo "Position count lower than threshold 200000"
docker-compose logs
exit 1
fi

Expand Down
8 changes: 7 additions & 1 deletion connector/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import warnings
import zlib
from typing import Optional, Tuple
from confluent_kafka import KafkaException, Producer # type: ignore
from confluent_kafka import KafkaException, KafkaError, Producer # type: ignore

CONNECTION_ERROR_LIMIT = 3

Expand Down Expand Up @@ -263,6 +263,7 @@ def delivery_report(err, _):
time.sleep(1)
except KafkaException as e:
if not e.args[0].retriable():
print(f"Kafka exception occurred that cannot be retried: {e}")
raise
print(
f"Encountered retriable kafka error ({e.args[0].str()}), "
Expand All @@ -285,6 +286,11 @@ async def main():
while producer is None:
try:
producer = Producer({"bootstrap.servers": "kafka:9092", "linger.ms": 500})
producer.produce(
"test",
key="noop",
value="",
)
except KafkaException as error:
print(f"Kafka isn't available ({error}), trying again in a few seconds")
time.sleep(3)
Expand Down
13 changes: 8 additions & 5 deletions connector/test/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def reconnect_after_error(
],
)
# verify expect output to kafka
mock_kafkaproducer.return_value.produce.assert_not_called()
# only call should be the to the test topic that the producer is ready
self.assertEqual(mock_kafkaproducer.return_value.produce.call_count, 1)
else:
# verify expected init cmds
self.assertEqual(
Expand All @@ -87,14 +88,15 @@ def reconnect_after_error(
)
# verify expect output to kafka
if len(error) == 1:
mock_kafkaproducer.return_value.produce.assert_called_once_with(
mock_kafkaproducer.return_value.produce.assert_any_call(
"topic1",
key=b"KPVD-1588929046-hexid-ADF994",
value=b'{"pitr":"1584126630","type":"arrival","id":"KPVD-1588929046-hexid-ADF994"}',
callback=ANY,
)
self.assertEqual(mock_kafkaproducer.return_value.produce.call_count, 2)
else:
self.assertEqual(mock_kafkaproducer.return_value.produce.call_count, len(error))
self.assertEqual(mock_kafkaproducer.return_value.produce.call_count, len(error)+1)

@patch("main.open_connection", new_callable=AsyncMock)
@patch("main.Producer", new_callable=Mock)
Expand Down Expand Up @@ -203,8 +205,9 @@ async def wrap_open_connection(self, *args, **kwargs):
return self.fh_reader, self.fh_writer

def save_line_stop_test(self, topic, value=None, key=None, callback=None):
self.emitted_msg.append(value)
raise EndTestNow()
if topic != "test":
self.emitted_msg.append(value)
raise EndTestNow()

def compression(self, mock_kafkaproducer, mock_openconnection, compression):
mock_kafkaproducer.return_value.produce.side_effect = self.save_line_stop_test
Expand Down
21 changes: 14 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,31 @@ services:
# OPTIONAL environment variables
# Firehose URL, defaults to firehose-test.flightaware.com.
# firehose.flightaware.com can also be used
# - SERVER=${SERVER:-}
- SERVER
# Streaming compression of incoming Firehose data. Valid values are gzip,
# deflate, or compress. Leave blank to disable compression.
# - COMPRESSION=${COMPRESSION:-}
- COMPRESSION
# Frequency in seconds to print stats about connection (messages/bytes
# per second). Set to 0 to disable.
# - PRINT_STATS_PERIOD=${PRINT_STATS_PERIOD:-}
- PRINT_STATS_PERIOD
# Frequency in seconds that Firehose should send a synthetic "keepalive"
# message to help connector ensure the connection is still alive. If no
# such message is received within roughly $keepalive seconds, connector
# will automatically reconnect to Firehose.
# - KEEPALIVE=${KEEPALIVE:-}
- KEEPALIVE
# The number of times that the same pitr seen in consecutive keeplive
# messages should trigger an error and a restart of the connection
# - KEEPALIVE_STALE_PITRS=${KEEPALIVE_STALE_PITRS:-}
- KEEPALIVE_STALE_PITRS
# "Time mode" of Firehose init command. Can be "live" or "pitr <pitr>";
# range is currently not supported.
# See https://flightaware.com/commercial/firehose/documentation/commands
# for more details.
# - INIT_CMD_TIME=${INIT_CMD_TIME:-}
- INIT_CMD_TIME
# The "optional" section of the Firehose init command. Mostly consists of
# filters for the data. Do not put username, password, keepalive, or
# compression commands here. Documentation at
# https://flightaware.com/commercial/firehose/documentation/commands
# - INIT_CMD_ARGS=${INIT_CMD_ARGS:-}
- INIT_CMD_ARGS

# PYTHON settings
- PYTHONUNBUFFERED=1
Expand All @@ -58,6 +58,8 @@ services:
options:
max-size: "10mb"
max-file: "5"
depends_on:
- kafka

db-updater:
image: "ghcr.io/flightaware/firestarter/firestarter_db-updater:${FS_VERSION:-latest}"
Expand Down Expand Up @@ -86,6 +88,8 @@ services:
options:
max-size: "10mb"
max-file: "5"
depends_on:
- kafka

position-db-updater:
image: "ghcr.io/flightaware/firestarter/firestarter_db-updater:${FS_VERSION:-latest}"
Expand Down Expand Up @@ -113,6 +117,7 @@ services:
max-size: "10mb"
max-file: "5"
depends_on:
- kafka
- timescaledb

fids-backend:
Expand Down Expand Up @@ -174,6 +179,8 @@ services:
options:
max-size: "10mb"
max-file: "5"
depends_on:
- kafka

zookeeper:
image: "bitnami/zookeeper:3.6.2"
Expand Down

0 comments on commit 9899a49

Please sign in to comment.