Thoth's investigator is a Kafka based component that consumes all messages produced by Thoth components.
It has monitoring system in places that allow Thoth team to see what is happening in Thoth in terms of Kafka, Openshift, Argo for the different components and act when some alarms are received.
This agent relies mainly on:
- thoth-messaging to handle Kafka messages.
- thoth-common to schedule Argo workflows.
- thoth-storages to set/verify content in database.
This documentation corresponds to a component called "investigator". Sources can be found on GitHub.
See thoth-station website and Thoth-Station organization on GitHub.
- Receive messages from different components and take action depending on the info about a package. (Consumer)
bold indicates required, italicized indicates optional
See thoth-messaging:
- KAFKA_BOOTSTRAP_SERVERS: a comma seperated list of Kafka bootstrap servers.
- KAFKA_SECURITY_PROTOCOL: specify what security protocol to use.
- KAFKA_SSL_CERTIFICATE_LOCATION (if security protocol is SSL).
- KAFKA_SASL_USERNAME and KAFKA_SASL_PASSWORD (if security protocol is SASL).
- KAFKA_CONSUMER_GROUP_ID: specify Kafka consumer group, if two consumers have the same group then message partitions are split between them. You can have a number of consumers equal to the number of message partitions.
- KAFKA_CONSUMER_MAX_POLL_INTERVAL_MS: This is a timeout, if the consumer does not poll for messages for N seconds then throws an error, when blocking for workflow limits this should be set moderately high. The default value is 300000.
- KAFKA_CONSUMER_ENABLE_AUTOCOMMIT: This should be set to False so that we don't commit messages which have not been fully processed yet. Investigator will handle commiting messages.
Git Services:
- THOTH_GITHUB_PRIVATE_TOKEN: token for authenticating actions on GitHub repositories
- THOTH_GITLAB_PRIVATE_TOKEN: token for authenticating actions on GitLab repositories
Enforcing a workflow limit:
- ARGO_PENDING_SLEEP_TIME: amount of time we wait between checking the number of workflows in progress
- ARGO_PENDING_WORKFLOW_LIMIT: limit to enforce on argo for total number of pending workflows
Retrying and Dealing with Exceptions:
- THOTH_INVESTIGATOR_MAX_RETRIES: indicates the number of times that investigator should attempt to consume a message
before pausing topic consumption or acking a failed message (default = 5)
- THOTH_INVESTIGATOR_BACKOFF: how long to wait before trying to consume a failed message again. This backoff strategy
is linear (default = 0.5)
- for 0 < i < MAX_RETRIES, wait time before attempting to consume again is i * BACKOFF
THOTH_INVESTIGATOR_ACK_ON_FAIL: * if type is integer: if != 0 then if max retries is reached, instead of pausing the topic's consumption the message will be acked and consumption will continue
- if type is list: if a list is passed as the envvar then all topics with a base name in that list will be acked
on failure
- /metrics : exposes prometheus metrics to be scraped
- /_health : indicates that web server can handle requests
- /resume/{base_topic_name} : if message fails and forces topic to halt consumption then this endpoint can be used to manually resume consumption after issues have been addressed
Thoth relies on Kafka and Argo for message handling and Argo workflows for services respectively.
Several types of messages are handled by investigator and different type of actions are performed. In particular we can distinguish different categories of messages in Thoth as described in the following sections.
The following messages are sent by different Thoth components:
- PackageReleasedMessage.
- UnresolvedPackageMessage.
- UnrevsolvedPackageMessage.
- SIUnanalyzedPackageMessage.
- SolvedPackageMessage.
- CVEProvidedMessage.
The following message is sent by advise reporter producer to show the use of recomendations across all Thoth integrations:
The following messages are sent by package update producer to keep knowledge in the database up to date:
The following message is sent by solver when Thoth acquired all missing knowledge required to provide advice to a user (human or bot):
The following messages are sent by User-API producer when users (humans or bots) interact with Thoth integrations:
- AdviserTriggerMessage.
- KebechetTriggerMessage
- PackageExtractTriggerMessage
- ProvenanceCheckerTriggerMessage
The following message is triggered internally to keep user repositories fresh when new Thoth knowledge is encountered:
The image above shows how Thoth keeps learning automatically using two fundamental components that produce messages described in this section:
- package release producer to acquire knowledge on newly released package version from a certain index.
- graph-refresh producer to allow Thoth continuosly learn and keep the internal knowledge up to date.
The image above shows how Thoth is able to self-learn and act on known errors during knowledge acquisition about Security for a certain package:
- if a package, version from a certain index cannot be downloaded because the source distro is missing or the package is missing SI workflow will send messages
(UpdateProvidesSourceDistroMessage or MissingVersionMessage respectively)
- Investigator takes the messages and acts setting flags for those packages in Thoth knowledge graph so that next time Thoth is not going to schedule security analysis
for that package. (In the image below what Grafana dashboard shows)
The image above shows how Thoth is able to self-heal when knowledge is missing in providing an advise:
- When a user requests Thoth advice, but there is missing information to provide it, the adviser Argo workflow
will send a message to Kafka (UnresolvedPackageMessage) through one of its tasks which depends on thoth-messaging library.
- investigator will consume these event messages and schedule solver workflows accordingly so that Thoth can learn about missing information.
- During solver workflow two Kafka messages are sent out: * SolvedPackageMessage, used by investigator to schedule the next information that needs to be learned by Thoth e.g security information. * AdviserTriggerMessage, that contains all information required by investigator to reschedule an adviser that previously failed.
- The loop is closed once the adviser workflow re-run is successful in providing advice.
This self-learning data-driven pipeline with Argo and Kafka is fundamental for all Thoth integrations because it will make Thoth learn about new packages and keep its knowledge up to date to what users use in their software stacks.
The image above explains what happen when a User of Thoth (Human or Bot) interacts with one of Thoth integrations.
Most of the additions to this repository will entail adding new messages to process. That is what is being documented here, if you feel that any information is missing please feel free to open an issue.
For each message there are two things you should implement:
- message processing
- consumer metrics
create a new directory in thoth/investigator which looks like this:
- message_name
- __init__.py
- investigate_<message_name>.py
- metrics_<message_name>.py
- README.md describing the message and what happens once consumed by investigator.
The implentation of this portion is highly specific to your own problem so not much can be advised in terms of rules and regulations. In general calling the function parse_<message_name>_message is best practice. Make sure to include the three basic metrics to your function:
@foo_exceptions.count_exceptions()
@foo_in_progress.track_inprogress()
def parse_foo_message(message):
# do stuff
foo_success.inc()
# <message_name> = foo
For consumer metrics you should at least have the following three:
- <message_name>_exceptions (prometheus Counter)
- <message_name>_success (prometheus Counter)
- <message_name>_in_progress (prometheus Gauge)
These are extensions of the metrics in thoth/investigator/metrics.py
The following is an example of a basic metrics file for a message foo:
from ..metrics import in_progress, success, exceptions
foo_in_progress = in_progress.labels(message_type="foo")
foo_success = success.labels(message_type="foo")
foo_exceptions = exceptions.labels(message_type="foo")
You can add metrics as you see fit, but if the metric is not specific only to your messages please move it to thoth/investigator/metrics.py and set the proper labels to differentiate between messages.
- thoth/investigator/<message_name>/__init__.py, please add the function for parsing messages