Skip to content

Commit 6bcec04

Browse files
committed
Remove sequence token
1 parent 7287d1a commit 6bcec04

File tree

1 file changed

+10
-42
lines changed

1 file changed

+10
-42
lines changed

lib/fluent/plugin/out_cloudwatch_logs.rb

Lines changed: 10 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,8 @@ def start
162162
end
163163
options[:http_proxy] = @http_proxy if @http_proxy
164164
@logs ||= Aws::CloudWatchLogs::Client.new(options)
165-
@sequence_tokens = {}
166-
@store_next_sequence_token_mutex = Mutex.new
167-
165+
@log_groups = {}
166+
168167
log.debug "Aws::CloudWatchLogs::Client initialized: log.level #{log.level} => #{options[:log_level]}"
169168

170169
@json_handler = case @json_handler
@@ -356,20 +355,6 @@ def scrub_record!(record)
356355
end
357356
end
358357

359-
def delete_sequence_token(group_name, stream_name)
360-
@sequence_tokens[group_name].delete(stream_name)
361-
end
362-
363-
def next_sequence_token(group_name, stream_name)
364-
@sequence_tokens[group_name][stream_name]
365-
end
366-
367-
def store_next_sequence_token(group_name, stream_name, token)
368-
@store_next_sequence_token_mutex.synchronize do
369-
@sequence_tokens[group_name][stream_name] = token
370-
end
371-
end
372-
373358
def put_events_by_chunk(group_name, stream_name, events)
374359
chunk = []
375360

@@ -413,9 +398,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
413398
log_stream_name: stream_name,
414399
}
415400

416-
token = next_sequence_token(group_name, stream_name)
417-
args[:sequence_token] = token if token
418-
419401
begin
420402
t = Time.now
421403
response = @logs.put_log_events(args)
@@ -424,7 +406,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
424406
"stream" => stream_name,
425407
"events_count" => events.size,
426408
"events_bytesize" => events_bytesize,
427-
"sequence_token" => token,
428409
"thread" => Thread.current.object_id,
429410
"request_sec" => Time.now - t,
430411
}
@@ -434,16 +415,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
434415
else
435416
log.debug "Called PutLogEvents API", request
436417
end
437-
rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException, Aws::CloudWatchLogs::Errors::DataAlreadyAcceptedException => err
438-
sleep 1 # to avoid too many API calls
439-
store_next_sequence_token(group_name, stream_name, err.expected_sequence_token)
440-
log.warn "updating upload sequence token forcefully because unrecoverable error occured", {
441-
"error" => err,
442-
"log_group" => group_name,
443-
"log_stream" => stream_name,
444-
"new_sequence_token" => token,
445-
}
446-
retry_count += 1
447418
rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException => err
448419
if @auto_create_stream && err.message == 'The specified log stream does not exist.'
449420
log.warn 'Creating log stream because "The specified log stream does not exist." error is got', {
@@ -452,7 +423,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
452423
"log_stream" => stream_name,
453424
}
454425
create_log_stream(group_name, stream_name)
455-
delete_sequence_token(group_name, stream_name)
456426
retry_count += 1
457427
else
458428
raise err
@@ -487,8 +457,6 @@ def put_events(group_name, stream_name, events, events_bytesize)
487457
if 0 < retry_count
488458
log.warn "retry succeeded"
489459
end
490-
491-
store_next_sequence_token(group_name, stream_name, response.next_sequence_token)
492460
end
493461

494462
def create_log_group(group_name, log_group_aws_tags = nil, retention_in_days = nil)
@@ -497,7 +465,7 @@ def create_log_group(group_name, log_group_aws_tags = nil, retention_in_days = n
497465
unless retention_in_days.nil?
498466
put_retention_policy(group_name, retention_in_days)
499467
end
500-
@sequence_tokens[group_name] = {}
468+
@log_groups[group_name] = []
501469
rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException
502470
log.debug "Log group '#{group_name}' already exists"
503471
end
@@ -517,18 +485,18 @@ def put_retention_policy(group_name, retention_in_days)
517485
def create_log_stream(group_name, stream_name)
518486
begin
519487
@logs.create_log_stream(log_group_name: group_name, log_stream_name: stream_name)
520-
@sequence_tokens[group_name] ||= {}
521-
@sequence_tokens[group_name][stream_name] = nil
488+
@log_groups[group_name] ||= []
489+
@log_groups[group_name].push(stream_name)
522490
rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException
523491
log.debug "Log stream '#{stream_name}' already exists"
524492
end
525493
end
526494

527495
def log_group_exists?(group_name)
528-
if @sequence_tokens[group_name]
496+
if @log_groups[group_name]
529497
true
530498
elsif check_log_group_existence(group_name)
531-
@sequence_tokens[group_name] = {}
499+
@log_groups[group_name] = []
532500
true
533501
else
534502
false
@@ -547,12 +515,12 @@ def check_log_group_existence(group_name)
547515
end
548516

549517
def log_stream_exists?(group_name, stream_name)
550-
if not @sequence_tokens[group_name]
518+
if not @log_groups[group_name]
551519
false
552-
elsif @sequence_tokens[group_name].has_key?(stream_name)
520+
elsif @log_groups[group_name].include?(stream_name)
553521
true
554522
elsif (log_stream = find_log_stream(group_name, stream_name))
555-
@sequence_tokens[group_name][stream_name] = log_stream.upload_sequence_token
523+
@log_groups[group_name].push(stream_name)
556524
true
557525
else
558526
false

0 commit comments

Comments
 (0)