Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Avoid missed records in a high volume scenario #96

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ Gemfile.lock
.bundle
vendor
coverage/

Jars.lock
.vscode
1 change: 1 addition & 0 deletions .jrubyrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
debug.fullTrace=true
86 changes: 86 additions & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Architecture

## Complexities in the source data that can cause query problems

The data source here is cloudwatch, whose `filter_log_events` method
returns log data for a single log group, possibly split into multiple
streams. It will attempt to interpolate these so they are in event
time order, but can't guarantee that.

In a high volume situation this results in two issues that can cause
problems in queries:

- Log messages in a group's query result may appear out of order;
although they will be in order as far as the stream is concerned.
- You may get multiple messages with the same timestamp


These cause a problem because there's no query model for tailing
a log stream (it's possible, but only with architectural changes
in your deployment) so the only way to get data is to record when
the last message happened and search again from there.

The above issues impact this as follows:

- *Out of order messages* - when to set the last event time
to avoid missing messages?
- *Multiple messages with the same timestamp* - if you have
2 messages in the original stream with the same timestamp,
and your last query returned the first one, how do you query
to get the second, without reprocessing the first one?

## Resolution using a log event tracking window

This was resolved in the [LogEventTracker](lib/logstash/inputs/group_event_tracker.rb)
by maintaining a record of a window of log events, storing every event in that period.

**NOTE:** all times are derived from the log event timestamp and
not the current actual timestamp. They are accurate to the millisecond.

The model is per log_group:

- `min_time`: the earliest time for which we have a log event for this group
- `max_time`: the latest time for which we have a log event for this group
- `map[log_event_time] -> set[events]`: a record of all the events
for this group in the log window.

In effect, we're keeping track of all the events we've seen in
the window (e.g. a 15 minute period). Once we get more than, say,
15 minutes worth of events, we start dropping the older events.

The window will tell you if a record is _"new"_ if:

- It's identified as an event we've never seen, where an event is identified
as unique using its `stream` name and its `eventId`
- It's for a millisecond on or after the min_time.


The process for querying the data for some group is:

```#ruby
# Get the earliest time for which we've seen any data
start_time = window.get_min_time(group)

# This might contain events we've already processed
events = filter_log_events (group, start_time)

# Loop through the events, skipping any we've already
# seen, and processing the rest
events.each do |event|
if !window.have_we_seen_this_before(group, event)
process_event(group, event)
end
end

# Once we've finished the search, purge any events that are too
# old (e.g. more than 15 minutes older than the maximum timestamp)
# and then save the data to a file so it's there if we restart
window.purge_events_too_old(group)
window.save_to_file(group)
```

In experiments I've found a 15 minute window avoids any missed records. In our
use cases, however, we've been routing through an aggregator that holds back
data for a few minutes to make sure it has enough data to push out, so you
can probably reduce this window to suit your own needs.

2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

* Updated sincedb to track the high water mark timestamps at a stream level, rather than just at the group level. This is a fix for [#74](https://github.com/lukewaite/logstash-input-cloudwatch-logs/issues/74)

## [v1.0.3] (2018-05-28)
* Update minimum version of `logstash-mixin-aws` to support assume role ([#9a4677f](https://github.com/lukewaite/logstash-input-cloudwatch-logs/commit/9a4677fef8bcbf291bd4b357be2a9568ea4f3fc1) - Fixes [#51](https://github.com/lukewaite/logstash-input-cloudwatch-logs/issues/51), [#39](https://github.com/lukewaite/logstash-input-cloudwatch-logs/issues/39))

Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Contributors:
* Ted Timmons (tedder)
* Ryan O'Keeffe (danielredoak)
* Luke Waite (lukewaite)
* Daniel Bray (daniel-bray-sonalake)

Note: If you've sent us patches, bug reports, or otherwise contributed to
Logstash, and you aren't on the list above and want to be, please let us know
Expand Down
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
source 'https://rubygems.org'
gemspec
gem 'simplecov', require: false, group: :test
gem 'coveralls', require: false, group: :test
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@ Optionally, you may set the `log_group_prefix` parameter to true
which will scan for all log groups matching the specified prefix(s)
and ingest all logs available in all of the matching groups.

The optional `log_group_suffix` parameter will use the results of the above
cloudwatch query and then filter them

## Usage

### Parameters
| Parameter | Input Type | Required | Default |
|-----------|------------|----------|---------|
| log_group | string or Array of strings | Yes | |
| log_group_prefix | boolean | No | `false` |
| log_group_suffix | string | No | |
| start_position | `beginning`, `end`, or an Integer | No | `beginning` |
| sincedb_path | string | No | `$HOME/.sincedb*` |
| interval | number | No | 60 |
Expand Down
152 changes: 78 additions & 74 deletions lib/logstash/inputs/cloudwatch_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
require "aws-sdk"
require "logstash/inputs/cloudwatch_logs/patch"
require "fileutils"
require 'logstash/inputs/group_event_tracker'


Aws.eager_autoload!

Expand Down Expand Up @@ -36,6 +38,10 @@ class LogStash::Inputs::CloudWatch_Logs < LogStash::Inputs::Base
# sincedb files to some path matching "$HOME/.sincedb*"
# Should be a path with filename not just a directory.
config :sincedb_path, :validate => :string, :default => nil
# the stream data grows over time, so we drop it after a configurable time
# but only after a new value comes in for some group (i.e. we purge one group
# at a time)
config :prune_since_db_stream_minutes, :validate => :number, :default => 60

# Interval to wait between to check the file list again after a run is finished.
# Value is in seconds.
Expand All @@ -44,11 +50,17 @@ class LogStash::Inputs::CloudWatch_Logs < LogStash::Inputs::Base
# Decide if log_group is a prefix or an absolute name
config :log_group_prefix, :validate => :boolean, :default => false

# Decide if present, then the results of the log group query are filtered again
# to limit to these values. Only applicable if log_group_prefix = true
config :log_group_suffix, :validate => :string, :list => true, :default => nil
config :negate_log_group_suffix, :validate => :boolean, :default => false

# When a new log group is encountered at initial plugin start (not already in
# sincedb), allow configuration to specify where to begin ingestion on this group.
# Valid options are: `beginning`, `end`, or an integer, representing number of
# seconds before now to read back from.
config :start_position, :default => 'beginning'



# def register
Expand All @@ -57,8 +69,6 @@ def register
require "digest/md5"
@logger.debug("Registering cloudwatch_logs input", :log_group => @log_group)
settings = defined?(LogStash::SETTINGS) ? LogStash::SETTINGS : nil
@sincedb = {}

check_start_position_validity

Aws::ConfigService::Client.new(aws_options_hash)
Expand Down Expand Up @@ -91,11 +101,14 @@ def register
@sincedb_path = File.join(sincedb_dir, ".sincedb_" + Digest::MD5.hexdigest(@log_group.join(",")))

@logger.info("No sincedb_path set, generating one based on the log_group setting",
:sincedb_path => @sincedb_path, :log_group => @log_group)
:sincedb_path => @sincedb_path, :log_group => @log_group)
end


@logger.info("Using sincedb_path #{@sincedb_path}")
@event_tracker = LogEventTracker.new(@sincedb_path, @prune_since_db_stream_minutes)
end #def register


public
def check_start_position_validity
raise LogStash::ConfigurationError, "No start_position specified!" unless @start_position
Expand All @@ -111,8 +124,7 @@ def check_start_position_validity
def run(queue)
@queue = queue
@priority = []
_sincedb_open
determine_start_position(find_log_groups, @sincedb)
@event_tracker.load()

while !stop?
begin
Expand All @@ -139,7 +151,12 @@ def find_log_groups
@log_group.each do |group|
loop do
log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: group, next_token: next_token)
groups += log_groups.log_groups.map {|n| n.log_group_name}
# if we have no suffix setting, or if the candidate group name ends with the suffix
# we use it
groups += log_groups.log_groups
.select { |n| @log_group_suffix.nil? || (n.log_group_name.end_with?(*@log_group_suffix) ^ @negate_log_group_suffix)}
.map {|n| n.log_group_name}

next_token = log_groups.next_token
@logger.debug("found #{log_groups.log_groups.length} log groups matching prefix #{group}")
break if next_token.nil?
Expand All @@ -158,67 +175,62 @@ def priority_of(group)
@priority.index(group) || -1
end

public
def determine_start_position(groups, sincedb)
groups.each do |group|
if !sincedb.member?(group)
case @start_position
when 'beginning'
sincedb[group] = 0

when 'end'
sincedb[group] = DateTime.now.strftime('%Q')

else
sincedb[group] = DateTime.now.strftime('%Q').to_i - (@start_position * 1000)
end # case @start_position
end
end
end # def determine_start_position

private
def process_group(group)
next_token = nil
loop do
if [email protected]?(group)
@sincedb[group] = 0
end
start_time = @event_tracker.get_or_set_min_time(group, get_default_start_time)

params = {
:log_group_name => group,
:start_time => @sincedb[group],
:start_time => start_time,
:interleaved => true,
:next_token => next_token
}
}
resp = @cloudwatch.filter_log_events(params)


actually_processed_count = 0
resp.events.each do |event|
process_log(event, group)
was_processed = process_log(event, group)
was_processed && actually_processed_count = actually_processed_count + 1
end

_sincedb_write
resp.events.length() > 0 && @logger.debug("Queried logs for #{group} from #{parse_time(start_time)} found #{resp.events.length()} events, processed #{actually_processed_count}")
# prune old records before saving
@event_tracker.purge(group)
@event_tracker.save()

next_token = resp.next_token
break if next_token.nil?
end
@priority.delete(group)
@priority << group

end #def process_group

# def process_log
# def process_log - returns true if the message was actually processed
private
def process_log(log, group)

@codec.decode(log.message.to_str) do |event|
event.set("@timestamp", parse_time(log.timestamp))
event.set("[cloudwatch_logs][ingestion_time]", parse_time(log.ingestion_time))
event.set("[cloudwatch_logs][log_group]", group)
event.set("[cloudwatch_logs][log_stream]", log.log_stream_name)
event.set("[cloudwatch_logs][event_id]", log.event_id)
decorate(event)

@queue << event
@sincedb[group] = log.timestamp + 1
identity = identify(group, log.log_stream_name)
if @event_tracker.is_new_event(group, log)
@logger.trace? && @logger.trace("Processing event")
@codec.decode(log.message.to_str) do |event|

event.set("@timestamp", parse_time(log.timestamp))
event.set("[cloudwatch_logs][ingestion_time]", parse_time(log.ingestion_time))
event.set("[cloudwatch_logs][log_group]", group)
event.set("[cloudwatch_logs][log_stream]", log.log_stream_name)
event.set("[cloudwatch_logs][event_id]", log.event_id)
decorate(event)

@queue << event

@event_tracker.record_processed_event(group, log)
return true
end
end
return false
end # def process_log

# def parse_time
Expand All @@ -227,39 +239,31 @@ def parse_time(data)
LogStash::Timestamp.at(data.to_i / 1000, (data.to_i % 1000) * 1000)
end # def parse_time

private
def _sincedb_open
begin
File.open(@sincedb_path) do |db|
@logger.debug? && @logger.debug("_sincedb_open: reading from #{@sincedb_path}")
db.each do |line|
group, pos = line.split(" ", 2)
@logger.debug? && @logger.debug("_sincedb_open: setting #{group} to #{pos.to_i}")
@sincedb[group] = pos.to_i
end
end
rescue
#No existing sincedb to load
@logger.debug? && @logger.debug("_sincedb_open: error: #{@sincedb_path}: #{$!}")
end
end # def _sincedb_open

private
def _sincedb_write
begin
IO.write(@sincedb_path, serialize_sincedb, 0)
rescue Errno::EACCES
# probably no file handles free
# maybe it will work next time
@logger.debug? && @logger.debug("_sincedb_write: error: #{@sincedb_path}: #{$!}")
end
end # def _sincedb_write
private
def identify(group, log_stream_name)
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-logs-loggroup.html
# ':' isn't allowed in a log group name, so we can use it safely
return "#{group}:#{log_stream_name}"
end

private
def is_stream_identifier(sincedb_name)
return sincedb_name.include? ":"
end

private
def serialize_sincedb
@sincedb.map do |group, pos|
[group, pos].join(" ")
end.join("\n") + "\n"
def get_default_start_time()
# chose the start time based on the configs
case @start_position
when 'beginning'
return 0
when 'end'
return DateTime.now.strftime('%Q').to_i
else
return DateTime.now.strftime('%Q').to_i - (@start_position.to_i * 1000)
end # case @start_position
end


end # class LogStash::Inputs::CloudWatch_Logs
Loading