Skip to content

Update in_cloudwatch_logs.rb for better throttling handling #264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 66 additions & 44 deletions lib/fluent/plugin/in_cloudwatch_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class CloudwatchLogsInput < Input
config_param :end_time, :string, default: nil
config_param :time_range_format, :string, default: "%Y-%m-%d %H:%M:%S"
config_param :throttling_retry_seconds, :time, default: nil
config_param :max_retry_count, :integer, default: 999 #TODO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config_param :max_retry_count, :integer, default: 999 #TODO

Please consider the appropriate default value and remove the TODO comment.

I'm not familiar with CloudWatch.
Can there be a case where a user wants to retry an unlimited number of times, like the current version?

If so, retry should be unlimited when this value is nil.
And the default value might be better to be nil for compatibility.

config_param :include_metadata, :bool, default: false
config_section :web_identity_credentials, multi: false do
config_param :role_arn, :string
Expand Down Expand Up @@ -272,66 +273,82 @@ def emit(group, stream, event, metadata)
end

def get_events(log_group_name, log_stream_name)
throttling_handler('get_log_events') do
request = {
log_group_name: log_group_name,
log_stream_name: log_stream_name
}
request.merge!(start_time: @start_time) if @start_time
request.merge!(end_time: @end_time) if @end_time
request = {
log_group_name: log_group_name,
log_stream_name: log_stream_name
}
request.merge!(start_time: @start_time) if @start_time
request.merge!(end_time: @end_time) if @end_time

if @use_log_group_name_prefix
log_next_token = next_token(log_stream_name, log_group_name)
else
log_next_token = next_token(log_stream_name)
end

request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty?
request[:start_from_head] = true if read_from_head?(log_next_token)

# Only apply throttling retry to the API call, not the whole method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Only apply throttling retry to the API call, not the whole method

Information that can be found in the code is not required for comments.
(Maybe those comments are for PR. Thanks. I understand the code. Let's remove them now.)

response = api_call_with_throttling('get_log_events') do
@logs.get_log_events(request)
end

if valid_next_token(log_next_token, response.next_forward_token)
if @use_log_group_name_prefix
log_next_token = next_token(log_stream_name, log_group_name)
store_next_token(response.next_forward_token, log_stream_name, log_group_name)
else
log_next_token = next_token(log_stream_name)
store_next_token(response.next_forward_token, log_stream_name)
end
request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty?
request[:start_from_head] = true if read_from_head?(log_next_token)
response = @logs.get_log_events(request)
if valid_next_token(log_next_token, response.next_forward_token)
if @use_log_group_name_prefix
store_next_token(response.next_forward_token, log_stream_name, log_group_name)
else
store_next_token(response.next_forward_token, log_stream_name)
end
end

response.events
end

response.events
end

def read_from_head?(next_token)
(!next_token.nil? && !next_token.empty?) || @start_time || @end_time
end

def describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil, log_group_name=nil)
throttling_handler('describe_log_streams') do
request = {
log_group_name: log_group_name != nil ? log_group_name : @log_group_name
}
request[:next_token] = next_token if next_token
request[:log_stream_name_prefix] = log_stream_name_prefix if log_stream_name_prefix
response = @logs.describe_log_streams(request)
if log_streams
log_streams.concat(response.log_streams)
else
log_streams = response.log_streams
end
if response.next_token
log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token, log_group_name)
end
log_streams
request = {
log_group_name: log_group_name != nil ? log_group_name : @log_group_name
}
request[:next_token] = next_token if next_token
request[:log_stream_name_prefix] = log_stream_name_prefix if log_stream_name_prefix

# Only apply throttling retry to the API call
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Only apply throttling retry to the API call

response = api_call_with_throttling('describe_log_streams') do
@logs.describe_log_streams(request)
end

if log_streams
log_streams.concat(response.log_streams)
else
log_streams = response.log_streams
end
if response.next_token
log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token, log_group_name)
end
log_streams
end

def throttling_handler(method_name)
# New method to handle API calls with throttling retry with exponential backoff
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# New method to handle API calls with throttling retry with exponential backoff

def api_call_with_throttling(method_name, retry_count = 0)
yield
rescue Aws::CloudWatchLogs::Errors::ThrottlingException => err
if throttling_retry_seconds
log.warn "ThrottlingException #{method_name}. Waiting #{throttling_retry_seconds} seconds to retry."
sleep throttling_retry_seconds

throttling_handler(method_name) { yield }
if @throttling_retry_seconds && retry_count < @max_retry_count
# Calculate backoff with jitter: base_time * (2^retry_count) + random_jitter
wait_time = @throttling_retry_seconds * (2 ** retry_count) * (0.9 + 0.2 * rand)
log.warn "Haia - ThrottlingException on #{method_name}. Retry #{retry_count+1}/#{@max_retry_count}. Waiting #{wait_time.round(2)} seconds."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you tell me what Haia means?

sleep wait_time

# Only retry the API call itself, not recursively
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Only retry the API call itself, not recursively

api_call_with_throttling(method_name, retry_count + 1) { yield }
else
error_msg = retry_count >= @max_retry_count ?
"Haia - Max retries (#{@max_retry_count}) exceeded for #{method_name}" :
"ThrottlingException for #{method_name} with no retry configured"
log.error error_msg
raise err
end
end
Expand All @@ -341,7 +358,12 @@ def describe_log_groups(log_group_name_prefix, log_groups = nil, next_token = ni
log_group_name_prefix: log_group_name_prefix
}
request[:next_token] = next_token if next_token
response = @logs.describe_log_groups(request)

# Apply throttling handling to describe_log_groups too
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Apply throttling handling to describe_log_groups too

response = api_call_with_throttling('describe_log_groups') do
@logs.describe_log_groups(request)
end

if log_groups
log_groups.concat(response.log_groups)
else
Expand Down
Loading