Skip to content

Commit 203d3c6

Browse files
committed
Remove sequence token
1 parent 7287d1a commit 203d3c6

File tree

1 file changed

+7
-42
lines changed

1 file changed

+7
-42
lines changed

lib/fluent/plugin/out_cloudwatch_logs.rb

Lines changed: 7 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,8 @@ def start
162162
end
163163
options[:http_proxy] = @http_proxy if @http_proxy
164164
@logs ||= Aws::CloudWatchLogs::Client.new(options)
165-
@sequence_tokens = {}
166-
@store_next_sequence_token_mutex = Mutex.new
167-
165+
@log_groups = {}
166+
168167
log.debug "Aws::CloudWatchLogs::Client initialized: log.level #{log.level} => #{options[:log_level]}"
169168

170169
@json_handler = case @json_handler
@@ -356,20 +355,6 @@ def scrub_record!(record)
356355
end
357356
end
358357

359-
def delete_sequence_token(group_name, stream_name)
360-
@sequence_tokens[group_name].delete(stream_name)
361-
end
362-
363-
def next_sequence_token(group_name, stream_name)
364-
@sequence_tokens[group_name][stream_name]
365-
end
366-
367-
def store_next_sequence_token(group_name, stream_name, token)
368-
@store_next_sequence_token_mutex.synchronize do
369-
@sequence_tokens[group_name][stream_name] = token
370-
end
371-
end
372-
373358
def put_events_by_chunk(group_name, stream_name, events)
374359
chunk = []
375360

@@ -413,9 +398,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
413398
log_stream_name: stream_name,
414399
}
415400

416-
token = next_sequence_token(group_name, stream_name)
417-
args[:sequence_token] = token if token
418-
419401
begin
420402
t = Time.now
421403
response = @logs.put_log_events(args)
@@ -424,7 +406,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
424406
"stream" => stream_name,
425407
"events_count" => events.size,
426408
"events_bytesize" => events_bytesize,
427-
"sequence_token" => token,
428409
"thread" => Thread.current.object_id,
429410
"request_sec" => Time.now - t,
430411
}
@@ -434,16 +415,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
434415
else
435416
log.debug "Called PutLogEvents API", request
436417
end
437-
rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException, Aws::CloudWatchLogs::Errors::DataAlreadyAcceptedException => err
438-
sleep 1 # to avoid too many API calls
439-
store_next_sequence_token(group_name, stream_name, err.expected_sequence_token)
440-
log.warn "updating upload sequence token forcefully because unrecoverable error occured", {
441-
"error" => err,
442-
"log_group" => group_name,
443-
"log_stream" => stream_name,
444-
"new_sequence_token" => token,
445-
}
446-
retry_count += 1
447418
rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException => err
448419
if @auto_create_stream && err.message == 'The specified log stream does not exist.'
449420
log.warn 'Creating log stream because "The specified log stream does not exist." error is got', {
@@ -452,7 +423,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
452423
"log_stream" => stream_name,
453424
}
454425
create_log_stream(group_name, stream_name)
455-
delete_sequence_token(group_name, stream_name)
456426
retry_count += 1
457427
else
458428
raise err
@@ -487,8 +457,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
487457
if 0 < retry_count
488458
log.warn "retry succeeded"
489459
end
490-
491-
store_next_sequence_token(group_name, stream_name, response.next_sequence_token)
492460
end
493461

494462
def create_log_group(group_name, log_group_aws_tags = nil, retention_in_days = nil)
@@ -497,7 +465,6 @@ def create_log_group(group_name, log_group_aws_tags = nil, retention_in_days = n
497465
unless retention_in_days.nil?
498466
put_retention_policy(group_name, retention_in_days)
499467
end
500-
@sequence_tokens[group_name] = {}
501468
rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException
502469
log.debug "Log group '#{group_name}' already exists"
503470
end
@@ -517,18 +484,16 @@ def put_retention_policy(group_name, retention_in_days)
517484
def create_log_stream(group_name, stream_name)
518485
begin
519486
@logs.create_log_stream(log_group_name: group_name, log_stream_name: stream_name)
520-
@sequence_tokens[group_name] ||= {}
521-
@sequence_tokens[group_name][stream_name] = nil
522487
rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException
523488
log.debug "Log stream '#{stream_name}' already exists"
524489
end
525490
end
526491

527492
def log_group_exists?(group_name)
528-
if @sequence_tokens[group_name]
493+
if @log_groups[group_name]
529494
true
530495
elsif check_log_group_existence(group_name)
531-
@sequence_tokens[group_name] = {}
496+
@log_groups[group_name] = []
532497
true
533498
else
534499
false
@@ -547,12 +512,12 @@ def check_log_group_existence(group_name)
547512
end
548513

549514
def log_stream_exists?(group_name, stream_name)
550-
if not @sequence_tokens[group_name]
515+
if not @log_groups[group_name]
551516
false
552-
elsif @sequence_tokens[group_name].has_key?(stream_name)
517+
elsif @log_groups[group_name].include?(stream_name)
553518
true
554519
elsif (log_stream = find_log_stream(group_name, stream_name))
555-
@sequence_tokens[group_name][stream_name] = log_stream.upload_sequence_token
520+
@log_groups[group_name].push(stream_name)
556521
true
557522
else
558523
false

0 commit comments

Comments
 (0)