diff --git a/lib/fluent/plugin/in_cloudwatch_logs.rb b/lib/fluent/plugin/in_cloudwatch_logs.rb index 11d334b..07ebaa7 100644 --- a/lib/fluent/plugin/in_cloudwatch_logs.rb +++ b/lib/fluent/plugin/in_cloudwatch_logs.rb @@ -43,6 +43,7 @@ class CloudwatchLogsInput < Input config_param :end_time, :string, default: nil config_param :time_range_format, :string, default: "%Y-%m-%d %H:%M:%S" config_param :throttling_retry_seconds, :time, default: nil + config_param :max_retry_count, :integer, default: 999 #TODO config_param :include_metadata, :bool, default: false config_section :web_identity_credentials, multi: false do config_param :role_arn, :string @@ -272,31 +273,36 @@ def emit(group, stream, event, metadata) end def get_events(log_group_name, log_stream_name) - throttling_handler('get_log_events') do - request = { - log_group_name: log_group_name, - log_stream_name: log_stream_name - } - request.merge!(start_time: @start_time) if @start_time - request.merge!(end_time: @end_time) if @end_time + request = { + log_group_name: log_group_name, + log_stream_name: log_stream_name + } + request.merge!(start_time: @start_time) if @start_time + request.merge!(end_time: @end_time) if @end_time + + if @use_log_group_name_prefix + log_next_token = next_token(log_stream_name, log_group_name) + else + log_next_token = next_token(log_stream_name) + end + + request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty? + request[:start_from_head] = true if read_from_head?(log_next_token) + + # Only apply throttling retry to the API call, not the whole method + response = api_call_with_throttling('get_log_events') do + @logs.get_log_events(request) + end + + if valid_next_token(log_next_token, response.next_forward_token) if @use_log_group_name_prefix - log_next_token = next_token(log_stream_name, log_group_name) + store_next_token(response.next_forward_token, log_stream_name, log_group_name) else - log_next_token = next_token(log_stream_name) + store_next_token(response.next_forward_token, log_stream_name) end - request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty? - request[:start_from_head] = true if read_from_head?(log_next_token) - response = @logs.get_log_events(request) - if valid_next_token(log_next_token, response.next_forward_token) - if @use_log_group_name_prefix - store_next_token(response.next_forward_token, log_stream_name, log_group_name) - else - store_next_token(response.next_forward_token, log_stream_name) - end - end - - response.events end + + response.events end def read_from_head?(next_token) @@ -304,34 +310,45 @@ def read_from_head?(next_token) end def describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil, log_group_name=nil) - throttling_handler('describe_log_streams') do - request = { - log_group_name: log_group_name != nil ? log_group_name : @log_group_name - } - request[:next_token] = next_token if next_token - request[:log_stream_name_prefix] = log_stream_name_prefix if log_stream_name_prefix - response = @logs.describe_log_streams(request) - if log_streams - log_streams.concat(response.log_streams) - else - log_streams = response.log_streams - end - if response.next_token - log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token, log_group_name) - end - log_streams + request = { + log_group_name: log_group_name != nil ? log_group_name : @log_group_name + } + request[:next_token] = next_token if next_token + request[:log_stream_name_prefix] = log_stream_name_prefix if log_stream_name_prefix + + # Only apply throttling retry to the API call + response = api_call_with_throttling('describe_log_streams') do + @logs.describe_log_streams(request) end + + if log_streams + log_streams.concat(response.log_streams) + else + log_streams = response.log_streams + end + if response.next_token + log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token, log_group_name) + end + log_streams end - def throttling_handler(method_name) + # New method to handle API calls with throttling retry with exponential backoff + def api_call_with_throttling(method_name, retry_count = 0) yield rescue Aws::CloudWatchLogs::Errors::ThrottlingException => err - if throttling_retry_seconds - log.warn "ThrottlingException #{method_name}. Waiting #{throttling_retry_seconds} seconds to retry." - sleep throttling_retry_seconds - - throttling_handler(method_name) { yield } + if @throttling_retry_seconds && retry_count < @max_retry_count + # Calculate backoff with jitter: base_time * (2^retry_count) + random_jitter + wait_time = @throttling_retry_seconds * (2 ** retry_count) * (0.9 + 0.2 * rand) + log.warn "Haia - ThrottlingException on #{method_name}. Retry #{retry_count+1}/#{@max_retry_count}. Waiting #{wait_time.round(2)} seconds." + sleep wait_time + + # Only retry the API call itself, not recursively + api_call_with_throttling(method_name, retry_count + 1) { yield } else + error_msg = retry_count >= @max_retry_count ? + "Haia - Max retries (#{@max_retry_count}) exceeded for #{method_name}" : + "ThrottlingException for #{method_name} with no retry configured" + log.error error_msg raise err end end @@ -341,7 +358,12 @@ def describe_log_groups(log_group_name_prefix, log_groups = nil, next_token = ni log_group_name_prefix: log_group_name_prefix } request[:next_token] = next_token if next_token - response = @logs.describe_log_groups(request) + + # Apply throttling handling to describe_log_groups too + response = api_call_with_throttling('describe_log_groups') do + @logs.describe_log_groups(request) + end + if log_groups log_groups.concat(response.log_groups) else