Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stage buffer sometimes sticks around and doesn't ever get queued #4662

Open
stanhu opened this issue Oct 10, 2024 · 7 comments
Open

Stage buffer sometimes sticks around and doesn't ever get queued #4662

stanhu opened this issue Oct 10, 2024 · 7 comments
Labels
bug Something isn't working

Comments

@stanhu
Copy link

stanhu commented Oct 10, 2024

Describe the bug

I've been trying to track down what looks like a memory leak for the last week where a stage buffer doesn't get cleared out even though new data arrives. In my latest attempt to isolate the problem, I noticed a jump to 8 MB in the fluentd_output_status_buffer_stage_byte_size Prometheus metric, which measures the total bytes of the stage queue:

image

This jump appears to persist indefinitely until I restart fluentd.

To Reproduce

I'm still working on this.

Expected behavior

No memory growth over time.

Your Environment

- Fluentd version: v1.16.5
- Package version: 5.0.4-1
- Operating system: Ubuntu 20.04.6
- Kernel version: 5.15.0-1051-gcp

Your Configuration

I don't have a clear reproduction step yet. Our config looks something like this:

<source>
  @type tail
  tag postgres.postgres
  path /var/log/postgresql/postgresql.log
  pos_file /var/log/fluent/postgres.log.pos
  format /(?<time>[^G]*) GMT \[(?<pg_id>\d+), (?<xid>\d+)\]: .* user=(?<pg_user>[^,]*),db=(?<pg_db>[^,]*),app=(?<pg_application>[^,]*),client=(?<pg_client>[^ ]*) (?<pg_message>.*)/
  time_format %Y-%m-%d %H:%M:%S.%N
</source>

<filter postgres.postgres_csv>
  @type postgresql_slowlog
</filter>

<filter postgres.postgres_csv>
  @type postgresql_redactor
  max_length 200000
</filter>

<match postgres.*>
  @type copy
  <store>
    @type google_cloud
    label_map {
      "tag": "tag"
    }
    buffer_type file
    buffer_path /opt/fluent/buffers/postgres/google_cloud
    buffer_chunk_limit 8MB
    buffer_queue_limit 1000
    flush_interval 30s
    log_level info
  </store>

  <store>
    @type cloud_pubsub
    topic pubsub-postgres-inf-gprd
    project my-project
    buffer_type file
    buffer_path /opt/fluent/buffers/postgres/cloud_pubsub
    buffer_chunk_limit 8MB
    buffer_queue_limit 1000
    flush_interval 30s
  </store>
</match>

Your Error Log

The stuck 8MB buffer seems to have coincided with an EOF error:

  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:170:in `open'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/event.rb:318:in `each'
  2024-10-08 10:40:21 +0000 [error]: #0 /etc/fluent/plugin/out_cloud_pubsub.rb:62:in `write'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/compat/output.rb:131:in `write'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1225:in `try_flush'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-10-08 10:40:21 +0000 [error]: #0 failed to purge buffer chunk chunk_id="623f4c358bd4b7cd7f63a4eb7410b459" error_class=Errno::ENOENT error=#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b623f4c358bd4b7cd7f63a4eb7410b459.log>
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:161:in `unlink'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:161:in `purge'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer.rb:601:in `block in purge_chunk'
  2024-10-08 10:40:21 +0000 [error]: #0 /opt/fluent/lib/ruby/3.2.0/monitor.rb:202:in `synchronize'
  2024-10-08 10:40:21 +0000 [error]: #0 /opt/fluent/lib/ruby/3.2.0/monitor.rb:202:in `mon_synchronize'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer.rb:592:in `purge_chunk'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1110:in `commit_write'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1229:in `try_flush'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
  2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-10-08 10:40:21.470273004 +0000 fluent.error: {"chunk_id":"623f4c358bd4b7cd7f63a4eb7410b459","error_class":"Errno::ENOENT","error":"#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b623f4c358bd4b7cd7f63a4eb7410b459.log>","message":"failed to purge buffer chunk chunk_id=\"623f4c358bd4b7cd7f63a4eb7410b459\" error_class=Errno::ENOENT error=#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b623f4c358bd4b7cd7f63a4eb7410b459.log>","tag":"fluent.error","environment":"gprd","hostname":"example.com","fqdn":"example.com","stage":"main","shard":"backup","tier":"db","type":"patroni"}

Additional context

Note that previously when log messages were up to 3 MB, I would see more of these "step" jumps in memory usage. I've altered our filters to truncate the log messages to 200K, which seems to have stopped most of these stage buffer leaks. But I'm still wondering if there is a corner case here where the file buffer got cleared but the stage buffer did not.

@stanhu
Copy link
Author

stanhu commented Oct 15, 2024

I saw this error message again today:

2024-10-15 16:50:28 +0000 [error]: #0 unexpected error error="closed stream"
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:170:in `seek'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:170:in `open'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/event.rb:318:in `each'
  2024-10-15 16:50:28 +0000 [error]: #0 /etc/fluent/plugin/out_cloud_pubsub.rb:62:in `write'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/compat/output.rb:131:in `write'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1225:in `try_flush'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-10-15 16:50:28.302744238 +0000 fluent.error: {"error":"closed stream","message":"unexpected error error=\"closed stream\"","tag":"fluent.error","environment":"gprd","hostname":"patroni-main-v14-02-db-gprd","fqdn":"patroni-main-v14-02-db-gprd.c.gitlab-production.internal","stage":"main","shard":"backup","tier":"db","type":"patroni"}
2024-10-15 16:50:28 +0000 [error]: #0 failed to purge buffer chunk chunk_id="62486bfe620a478fb7b97c5b27980db8" error_class=Errno::ENOENT error=#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b62486bfe620a478fb7b97c5b27980db8.log>
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:161:in `unlink'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:161:in `purge'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer.rb:601:in `block in purge_chunk'
  2024-10-15 16:50:28 +0000 [error]: #0 /opt/fluent/lib/ruby/3.2.0/monitor.rb:202:in `synchronize'
  2024-10-15 16:50:28 +0000 [error]: #0 /opt/fluent/lib/ruby/3.2.0/monitor.rb:202:in `mon_synchronize'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer.rb:592:in `purge_chunk'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1110:in `commit_write'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1229:in `try_flush'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
  2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-10-15 16:50:28.791864146 +0000 fluent.error: {"chunk_id":"62486bfe620a478fb7b97c5b27980db8","error_class":"Errno::ENOENT","error":"#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b62486bfe620a478fb7b97c5b27980db8.log>","message":"failed to purge buffer chunk chunk_id=\"62486bfe620a478fb7b97c5b27980db8\" error_class=Errno::ENOENT error=#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b62486bfe620a478fb7b97c5b27980db8.log>","tag":"fluent.error","environment":"gprd","hostname":"patroni-main-v14-02-db-gprd","fqdn":"patroni-main-v14-02-db-gprd.c.gitlab-production.internal","stage":"main","shard":"backup","tier":"db","type":"patroni"}

In this example, I suspect:

  1. The file buffer stream closed for some reason, resulting in the closed stream error. Even if the file were deleted, I'm not sure why this would result in the closed stream error unless this pertained to the upstream PubSub service.
  2. The buffer attempted to be purged, but this resulted in the ENOENT error.
  3. As a result, the ENOENT prevented the metrics from updating:

@queue_size_metrics.sub(bytesize)

I don't know why this closed stream error happens infrequently, but I wonder:

  1. Should ENOENT be ignored in Buffer::FileChunk#purge?
  2. Should Buffer#purge_chunk catch ENOENT and ensure that @queue_size_metrics.sub(bytesize) runs?

@stanhu
Copy link
Author

stanhu commented Oct 16, 2024

It seems this intermittent closed stream error was reported a while ago: #2391

@stanhu
Copy link
Author

stanhu commented Oct 16, 2024

@ashie I suspect we need #4336 after all. I'm seeing what looks to be a race condition on a single worker instance.

@ashie
Copy link
Member

ashie commented Oct 16, 2024

It seems that your own plugin is related with this stack trace.

 2024-10-15 16:50:28 +0000 [error]: #0 /etc/fluent/plugin/out_cloud_pubsub.rb:62:in `write'

Since we can't read this code, I'm not sure the cause yet.

@ashie ashie added moreinfo Missing version, need reproducible steps, need to investigate more waiting-for-user Similar to "moreinfo", but especially need feedback from user and removed waiting-for-triage labels Oct 16, 2024
@ashie
Copy link
Member

ashie commented Oct 16, 2024

It seems this intermittent closed stream error was reported a while ago: #2391

We already found out that #2391 is caused by rollback, but it doesn't appear in your error log.
So it doesn't seem related with this.

@stanhu
Copy link
Author

stanhu commented Oct 16, 2024

@ashie out_cloud_pubsub.rb is this:

# Originally copied from https://github.com/yosssi/fluent-plugin-cloud-pubsub
# License: MIT
require 'google/cloud/pubsub'

module Fluent
  class CloudPubSubOutput < BufferedOutput
    MAX_REQ_SIZE = 10 * 1024 * 1024 # 10 MB
    MAX_MSGS_PER_REQ = 1000

    Plugin.register_output('cloud_pubsub', self)

    config_param :project,          :string,  :default => nil
    config_param :topic,            :string,  :default => nil
    config_param :key,              :string,  :default => nil
    config_param :max_req_size,     :integer, :default => MAX_REQ_SIZE
    config_param :max_msgs_per_req, :integer, :default => MAX_MSGS_PER_REQ

    unless method_defined?(:log)
      define_method("log") { $log }
    end

    unless method_defined?(:router)
      define_method("router") { Fluent::Engine }
    end

    def configure(conf)
      super

      raise Fluent::ConfigError, "'project' must be specified." unless @project
      raise Fluent::ConfigError, "'topic' must be specified." unless @topic
    end

    def multi_workers_ready?
      true
    end

    def start
      super

      pubsub = Google::Cloud::PubSub.new(project_id: @project, credentials: @key)
      @client = pubsub.topic @topic
    end

    def format(tag, time, record)
      [tag, time, record].to_msgpack
    end

    def publish(msgs)
      log.debug "publish #{msgs.length} messages"

      @client.publish do |batch|
        msgs.each do |m|
          batch.publish m
        end
      end
    end

    def write(chunk)
      msgs = []
      msgs_size = 0

      chunk.msgpack_each do |tag, time, record|
        size = Yajl.dump(record).bytesize
        if msgs.length > 0 && (msgs_size + size > @max_req_size || msgs.length + 1 > @max_msgs_per_req)
          publish(msgs)
          msgs = []
          msgs_size = 0
        end
        msgs << record.to_json
        msgs_size += size
      end

      if msgs.length > 0
        publish(msgs)
      end
    rescue
      log.error "unexpected error", :error=>$!.to_s
      log.error_backtrace
    end
  end
end

The error shows that it's happening during seek of file_chunk.rb, so I don't think this plugin is at fault here.

@ashie ashie added waiting-for-triage bug Something isn't working and removed moreinfo Missing version, need reproducible steps, need to investigate more waiting-for-user Similar to "moreinfo", but especially need feedback from user waiting-for-triage labels Oct 17, 2024
@ashie
Copy link
Member

ashie commented Oct 18, 2024

Thanks for sharing the plugin code.

The file buffer stream closed for some reason, resulting in the closed stream error.

AFAIK closed stream means it's closed by this process unexpectedly, not by other process.
So it seems there is a bug in fluentd's buffer code as you say.
But I'm not sure the cause yet.

@ashie I suspect we need #4336 after all. I'm seeing what looks to be a race condition on a single worker instance.

If a chunk is possible to be processed by multiple thread simultaneously, it might be effective.
But usually a queued chunk isn't processed by multiple threads, since a queued chunk is popped at the synchronized block in dequeue_chunk method before processing:

chunk = @buffer.dequeue_chunk

synchronize do
chunk = @queue.shift
# this buffer is dequeued by other thread just before "synchronize" in this thread
return nil unless chunk
@dequeued[chunk.unique_id] = chunk
@queued_num[chunk.metadata] -= 1 # BUG if nil, 0 or subzero
@dequeued_num[chunk.metadata] ||= 0
@dequeued_num[chunk.metadata] += 1
log.trace "chunk dequeued", instance: self.object_id, metadata: chunk.metadata
chunk
end

In addition, you are using only a single flush threads (no flush_thread_count in you configuration).
So I think #4336 wouldn't solve your issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants