Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/agents/influxdb_publisher.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,40 @@ Add the InfluxDB Publisher Agent container to your docker compose file::
hostname: ocs-docker
environment:
- INSTANCE_ID=influxagent
- INFLUXDB_USERNAME=admin # optional
- INFLUXDB_PASSWORD=password # optional
volumes:
- ${OCS_CONFIG_DIR}:/config:ro

By default, InfluxDB HTTP authentication is not required. However, if needed,
you can optionally pass credentials to use when connecting to your InfluxDB
instance through the environment variables shown above or through a file, for
instance if you would like to use Docker Secrets. A Docker Secrets example is
shown below::

secrets:
INFLUXDB_USERNAME:
file: ${PWD}/secrets/INFLUXDB_USERNAME
INFLUXDB_PASSWORD:
file: ${PWD}/secrets/INFLUXDB_PASSWORD
services:
ocs-influxdb-publisher:
image: simonsobs/ocs:latest
hostname: ocs-docker
secrets:
- INFLUXDB_USERNAME
- INFLUXDB_PASSWORD
environment:
- INSTANCE_ID=influxagent
- INFLUXDB_USERNAME_FILE=/run/secrets/INFLUXDB_USERNAME
- INFLUXDB_PASSWORD_FILE=/run/secrets/INFLUXDB_PASSWORD
volumes:
- ${OCS_CONFIG_DIR}:/config:ro

.. note::
The ``INFLUXDB_USERNAME`` and ``INFLUXDB_PASSWORD`` variables will take
precedence if they are also present when using the ``_FILE`` variables.

You will also need an instance of InfluxDB running somewhere on your network.
This likely should go in a separate docker compose file so that it remains
online at all times. An example compose file would look like::
Expand Down
49 changes: 48 additions & 1 deletion ocs/agents/influxdb_publisher/drivers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import time
import txaio

Expand Down Expand Up @@ -31,6 +32,39 @@ def timestamp2influxtime(time, protocol):
return influx_t


def _get_credentials():
"""Read credentials from environment variable or file.

Reads from either `INFLUXDB_USERNAME`, `INFLUXDB_PASSWORD`,
`INFLUXDB_USERNAME_FILE`, or `INFLUXDB_PASSWORD_FILE`. Precedence is given
to the non-`_FILE` variables.

Returns:
A tuple of (username, password). Defaults to ('root', 'root') if none
of the environment variables are present.

"""
username_file = os.environ.get('INFLUXDB_USERNAME_FILE')
password_file = os.environ.get('INFLUXDB_PASSWORD_FILE')

username = None
password = None
if username_file is not None:
with open(username_file, 'r', encoding="utf-8") as f:
username = f.read().rstrip('\r\n')
if password_file is not None:
with open(password_file, 'r', encoding="utf-8") as f:
password = f.read().rstrip('\r\n')

username_default = 'root' if username is None else username
password_default = 'root' if password is None else password

username = os.environ.get('INFLUXDB_USERNAME', username_default)
password = os.environ.get('INFLUXDB_PASSWORD', password_default)

return username, password


class Publisher:
"""
Data publisher. This manages data to be published to the InfluxDB.
Expand Down Expand Up @@ -81,7 +115,14 @@ def __init__(self, host, database, incoming_data, port=8086, protocol='line',
print(f"gzip encoding enabled: {gzip}")
print(f"data protocol: {protocol}")

self.client = InfluxDBClient(host=self.host, port=self.port, gzip=gzip)
username, password = _get_credentials()

self.client = InfluxDBClient(
host=self.host,
port=self.port,
username=username,
password=password,
gzip=gzip)

db_list = None
# ConnectionError here is indicative of InfluxDB being down
Expand All @@ -92,6 +133,12 @@ def __init__(self, host, database, incoming_data, port=8086, protocol='line',
LOG.error("Connection error, attempting to reconnect to DB.")
self.client = InfluxDBClient(host=self.host, port=self.port, gzip=gzip)
time.sleep(1)
except InfluxDBClientError as err:
if err.code == 401:
LOG.error("Failed to authenticate. Check your credentials.")
else:
LOG.error(f"Unknown client error: {err}")
time.sleep(1)
if operate_callback and not operate_callback():
break

Expand Down
33 changes: 32 additions & 1 deletion tests/test_influxdb_publisher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os

import pytest

from ocs.agents.influxdb_publisher.drivers import Publisher, timestamp2influxtime
from ocs.agents.influxdb_publisher.drivers import Publisher, timestamp2influxtime, _get_credentials


@pytest.mark.parametrize("t,protocol,expected",
Expand All @@ -27,3 +29,32 @@ def test_format_data():

expected = 'test_address,feed=test_feed key1=1i,key2=2.3,key3="test" 1615394417359038720'
assert Publisher.format_data(data, feed, 'line')[0] == expected


def test__get_credentials(tmp_path):
# Defaults
assert _get_credentials() == ('root', 'root')

# Set from file
d = tmp_path
username_file = d / "username"
username_file.write_text("admin", encoding="utf-8")
password_file = d / "password"
password_file.write_text("testpass", encoding="utf-8")

os.environ['INFLUXDB_USERNAME_FILE'] = str(username_file)
os.environ['INFLUXDB_PASSWORD_FILE'] = str(password_file)
assert _get_credentials() == ('admin', 'testpass')

# Set from env var
os.environ['INFLUXDB_USERNAME'] = 'user_var'
os.environ['INFLUXDB_PASSWORD'] = 'pass_var'
assert _get_credentials() == ('user_var', 'pass_var')

# Cleanup
vars_ = ['INFLUXDB_USERNAME_FILE',
'INFLUXDB_PASSWORD_FILE',
'INFLUXDB_USERNAME',
'INFLUXDB_PASSWORD']
for v in vars_:
os.environ.pop(v)