Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions docs/agents/influxdb_publisher_v2.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
.. highlight:: rst

.. _influxdb_publisher_v2:

========================
InfluxDB Publisher v2 Agent
========================

The InfluxDB Publisher v2 Agent is an upgraded InfluxDB Publisher Agent that
uses the influxdb-client package instead of influxdb package to work with
InfluxDB v2.

.. argparse::
:module: ocs.agents.influxdb_publisher_v2.agent
:func: make_parser
:prog: agent.py

Configuration File Examples
---------------------------

Below are configuration examples for the ocs config file and for running the
Agent in a docker container. Also included is an example for setting up
Grafana to display data from InfluxDB.

OCS Site Config
```````````````

Add an InfluxDBAgent to your OCS configuration file::

{'agent-class': 'InfluxDBAgentv2',
'instance-id': 'influxagent',
'arguments': ['--initial-state', 'record']},

Docker Compose
``````````````

Add the InfluxDB Publisher v2 Agent container to your docker-compose file::

ocs-influxdb-publisher-v2:
image: simonsobs/ocs:latest
hostname: ocs-docker
environment:
- INSTANCE_ID=influxagent
volumes:
- ${OCS_CONFIG_DIR}:/config:ro
env_file:
- .env

Your .env file should contain credentials for the InfluxDB v2 database::

INFLUXDB_V2_URL=http://influxdb:8086
INFLUXDB_V2_ORG=ocs
INFLUXDB_V2_BUCKET=ocs_feeds
INFLUXDB_V2_TOKEN=<your-token>

You will also need an instance of InfluxDB v2 running somewhere on your network.
This likely should go in a separate docker-compose file so that it remains
online at all times. An example compose file would look like::

services:
influxdb:
image: "influxdb:2.7"
container_name: "influxdb"
restart: always
ports:
- "8086:8086"
environment:
- INFLUXDB_HTTP_LOG_ENABLED=false
volumes:
- /srv/influxdb2:/var/lib/influxdb2

networks:
default:
external:
name: ocs-net

.. note::
This separate docker-compose file setup depends on having a docker network
that connects your various docker-compose files. On a single-node setup
this can be accomplished with the network settings above in each docker-compose
file.

You then need to create the docker network with::

$ docker network create --driver bridge ocs-net

Containers on the network should then be able to communicate.

For more information about configuring Docker Compose files, see the `Compose
file reference`_.

.. _`Compose file reference`: https://docs.docker.com/compose/compose-file/

Database Migration
``````````````````

Follow instructions to `Upgrade from InfluxDB 1.x to 2.7 with Docker <upgrade_>`_.

.. _`upgrade`: https://docs.influxdata.com/influxdb/v2/install/upgrade/v1-to-v2/docker/

Grafana
```````

Once your InfluxDB v2 container and publisher are configured and running you will
need to create an InfluxDB data source in Grafana. To do so, we add an InfluxDB
data source with the URL ``http://influxdb:8086``, and the Database
(default "ocs_feeds", but this can be customized in your OCS config file.) The
Name of the Data Source is up to you, in this example we set it to "OCS Feeds".
Note that if you migrated from InfluxDB 1.x to 2.7, this process is slightly
different from adding an InfluxDB 1.x data source, as the auth token is required.

Follow instructions to `Configure your InfluxDB connection <configure_>`_ for InfluxDB v2.

.. _`configure`: https://docs.influxdata.com/influxdb/v2/tools/grafana/?t=InfluxQL#configure-your-influxdb-connection

.. note::
The "ocs_feeds" database (or whatever you choose to name the database) will
not exist until the first time the InfluxDB Publisher Agent has successfully
connected to the InfluxDB.

For more information about using InfluxDB in Grafana, see the `Grafana Documentation`_.

.. _`Grafana Documentation`: https://grafana.com/docs/features/datasources/influxdb/

Agent API
---------

.. autoclass:: ocs.agents.influxdb_publisher_v2.agent.InfluxDBAgentv2
:members:


Supporting APIs
---------------

.. autoclass:: ocs.agents.influxdb_publisher_v2.agent.Publisher
:members:
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ write OCS Agents or Clients.
agents/aggregator
agents/barebones
agents/influxdb_publisher
agents/influxdb_publisher_v2
agents/registry
agents/fake_data
agents/host_manager
Expand Down
106 changes: 3 additions & 103 deletions ocs/agents/influxdb_publisher/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,17 @@
import time
import txaio

from datetime import datetime, timezone
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
from requests.exceptions import ConnectionError as RequestsConnectionError

from ocs.common.influxdb_drivers import format_data

# For logging
txaio.use_twisted()
LOG = txaio.make_logger()


def timestamp2influxtime(time, protocol):
"""Convert timestamp for influx, always in UTC.

Args:
time:
ctime timestamp
protocol:
'json' or line'

"""
if protocol == 'json':
t_dt = datetime.fromtimestamp(time)
# InfluxDB expects UTC by default
t_dt = t_dt.astimezone(tz=timezone.utc)
influx_t = t_dt.strftime("%Y-%m-%dT%H:%M:%S.%f")
elif protocol == 'line':
influx_t = int(time * 1e9) # ns
return influx_t


def _get_credentials():
"""Read credentials from environment variable or file.

Expand Down Expand Up @@ -163,7 +144,7 @@ def process_incoming_data(self):
continue

# Formatted for writing to InfluxDB
payload.extend(self.format_data(data, feed, protocol=self.protocol))
payload.extend(format_data(data, feed, protocol=self.protocol))

# Skip trying to write if payload is empty
if not payload:
Expand All @@ -185,87 +166,6 @@ def process_incoming_data(self):
except InfluxDBServerError as err:
LOG.error("InfluxDB Server Error: {e}", e=err)

@staticmethod
def _format_field_line(field_key, field_value):
"""Format key-value pair for InfluxDB line protocol."""
# Strings must be in quotes for line protocol
if isinstance(field_value, str):
line = f'{field_key}="{field_value}"'
else:
line = f"{field_key}={field_value}"
# Don't append 'i' to bool, which is a subclass of int
if isinstance(field_value, int) and not isinstance(field_value, bool):
line += "i"
return line

@staticmethod
def format_data(data, feed, protocol):
"""Format the data from an OCS feed into a dict for pushing to InfluxDB.

The scheme here is as follows:
- agent_address is the "measurement" (conceptually like an SQL
table)
- feed names are an indexed "tag" on the data structure
(effectively a table column)
- keys within an OCS block's 'data' dictionary are the field names
(effectively a table column)

Args:
data (dict):
data from the OCS Feed subscription
feed (dict):
feed from the OCS Feed subscription, contains feed information
used to structure our influxdb query
protocol (str):
Protocol for writing data. Either 'line' or 'json'.

Returns:
list: Data ready to publish to influxdb, in the specified protocol.

"""
measurement = feed['agent_address']
feed_tag = feed['feed_name']

json_body = []

# Reshape data for query
for bk, bv in data.items():
grouped_data_points = []
times = bv['timestamps']
num_points = len(bv['timestamps'])
for i in range(num_points):
grouped_dict = {}
for data_key, data_value in bv['data'].items():
grouped_dict[data_key] = data_value[i]
grouped_data_points.append(grouped_dict)

for fields, time_ in zip(grouped_data_points, times):
if protocol == 'line':
fields_line = []
for mk, mv in fields.items():
f_line = Publisher._format_field_line(mk, mv)
fields_line.append(f_line)

measurement_line = ','.join(fields_line)
t_line = timestamp2influxtime(time_, protocol='line')
line = f"{measurement},feed={feed_tag} {measurement_line} {t_line}"
json_body.append(line)
elif protocol == 'json':
json_body.append(
{
"measurement": measurement,
"time": timestamp2influxtime(time_, protocol='json'),
"fields": fields,
"tags": {
"feed": feed_tag
}
}
)
else:
LOG.warn(f"Protocol '{protocol}' not supported.")

return json_body

def run(self):
"""Main run iterator for the publisher. This processes all incoming
data, removes stale providers, and writes active providers to disk.
Expand Down
Empty file.
Loading