Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_unix: cause "[error]: #0 unexpected error in json payload error" or "[warn]: #0 incoming data is broken" with huge data #4692

Open
Watson1978 opened this issue Nov 2, 2024 · 2 comments
Labels
bug Something isn't working

Comments

@Watson1978
Copy link
Contributor

Watson1978 commented Nov 2, 2024

Describe the bug

When it send the very long data at once from the client, it causes error with JSON data or warning with MessagePack format.
In my environment, this always happens when I send more than 256 KB at once.

To Reproduce

It just sends data using following code

# frozen_string_literal: true
require "socket"
require "json"
require "msgpack"

FORMAT = :json
UNIX_SOCKET_PATH = "/tmp/fluentd-unix.sock"

DATA_LENGTH = 1024 * 256

def data_generater
  d = ['test', Time.now.to_i, {"length_#{DATA_LENGTH}": "a" * DATA_LENGTH}]
  case FORMAT
  when :json
    d.to_json
  when :msgpack
    MessagePack.pack(d)
  else
    raise "unknown format: #{FORMAT}"
  end
end

begin
  s = UNIXSocket.new(UNIX_SOCKET_PATH)
  loop do
    data =
    s.send(data_generater, 0)
    sleep 1
  end
rescue Errno::EPIPE => e
  p e
ensure
  s&.close
end

client side

ruby unix_client.rb

Fluentd side

bundle exec bin/fluentd -c ~/socket_unix.conf --workers 1

Expected behavior

No errors

Your Environment

- Fluentd version: git master
- Package version:
- Operating system:
- Kernel version:

Your Configuration

<source>
  @type unix
  path /tmp/fluentd-unix.sock
</source>

<match **>
  @type file
  path /tmp/fluentd.log
</match>

Your Error Log

JSON

Send JSON data from client

$ bundle exec bin/fluentd -c ~/socket_unix.conf --workers 1
2024-11-02 09:44:04 +0900 [info]: init supervisor logger path=nil rotate_age=nil rotate_size=nil
2024-11-02 09:44:04 +0900 [info]: parsing config file is succeeded path="/home/watson/socket_unix.conf"
2024-11-02 09:44:04 +0900 [info]: gem 'fluentd' version '1.17.1'
2024-11-02 09:44:04 +0900 [warn]: define <match fluent.**> to capture fluentd logs in top level is deprecated. Use <label @FLUENT_LOG> instead
2024-11-02 09:44:04 +0900 [info]: using configuration file: <ROOT>
  <source>
    @type unix
    path "/tmp/fluentd-unix.sock"
  </source>
  <match **>
    @type file
    path "/tmp/fluentd.log"
    <buffer time>
      path "/tmp/fluentd.log"
    </buffer>
  </match>
</ROOT>
2024-11-02 09:44:04 +0900 [info]: starting fluentd-1.17.1 pid=16001 ruby="3.3.5"
2024-11-02 09:44:04 +0900 [info]: spawn command to main:  cmdline=["/home/watson/.rbenv/versions/3.3.5/bin/ruby", "-r/home/watson/.rbenv/versions/3.3.5/lib/ruby/site_ruby/3.3.0/bundler/setup", "-Eascii-8bit:ascii-8bit", "bin/fluentd", "-c", "/home/watson/socket_unix.conf", "--workers", "1", "--under-supervisor"]
2024-11-02 09:44:04 +0900 [info]: #0 init worker0 logger path=nil rotate_age=nil rotate_size=nil
2024-11-02 09:44:04 +0900 [info]: adding match pattern="**" type="file"
2024-11-02 09:44:04 +0900 [info]: adding source type="unix"
2024-11-02 09:44:04 +0900 [warn]: #0 define <match fluent.**> to capture fluentd logs in top level is deprecated. Use <label @FLUENT_LOG> instead
2024-11-02 09:44:04 +0900 [info]: #0 starting fluentd worker pid=16025 ppid=16001 worker=0
2024-11-02 09:44:04 +0900 [warn]: #0 Found existing '/tmp/fluentd-unix.sock'. Remove this file for in_unix plugin
2024-11-02 09:44:04 +0900 [info]: #0 listening fluent socket on /tmp/fluentd-unix.sock
2024-11-02 09:44:04 +0900 [info]: #0 fluentd worker is now running worker=0
2024-11-02 09:44:07 +0900 [error]: #0 unexpected error in json payload error="lexical error: invalid string in json text.\n                                     [\"test\",1730508247,{\"length_26214\n                     (right here) ------^\n"
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/src/fluentd/lib/fluent/plugin/in_unix.rb:175:in `<<'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/src/fluentd/lib/fluent/plugin/in_unix.rb:175:in `on_read_json'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/.rbenv/versions/3.3.5/lib/ruby/gems/3.3.0/gems/cool.io-1.9.0/lib/cool.io/io.rb:123:in `on_readable'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/.rbenv/versions/3.3.5/lib/ruby/gems/3.3.0/gems/cool.io-1.9.0/lib/cool.io/io.rb:186:in `on_readable'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/.rbenv/versions/3.3.5/lib/ruby/gems/3.3.0/gems/cool.io-1.9.0/lib/cool.io/loop.rb:88:in `run_once'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/.rbenv/versions/3.3.5/lib/ruby/gems/3.3.0/gems/cool.io-1.9.0/lib/cool.io/loop.rb:88:in `run'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/src/fluentd/lib/fluent/plugin_helper/event_loop.rb:93:in `block in start'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/src/fluentd/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'

MessagePack

Send MessagePack data from client

$ bundle exec bin/fluentd -c ~/socket_unix.conf --workers 1
2024-11-02 09:45:01 +0900 [info]: init supervisor logger path=nil rotate_age=nil rotate_size=nil
2024-11-02 09:45:01 +0900 [info]: parsing config file is succeeded path="/home/watson/socket_unix.conf"
2024-11-02 09:45:01 +0900 [info]: gem 'fluentd' version '1.17.1'
2024-11-02 09:45:01 +0900 [warn]: define <match fluent.**> to capture fluentd logs in top level is deprecated. Use <label @FLUENT_LOG> instead
2024-11-02 09:45:01 +0900 [info]: using configuration file: <ROOT>
  <source>
    @type unix
    path "/tmp/fluentd-unix.sock"
  </source>
  <match **>
    @type file
    path "/tmp/fluentd.log"
    <buffer time>
      path "/tmp/fluentd.log"
    </buffer>
  </match>
</ROOT>
2024-11-02 09:45:01 +0900 [info]: starting fluentd-1.17.1 pid=16161 ruby="3.3.5"
2024-11-02 09:45:01 +0900 [info]: spawn command to main:  cmdline=["/home/watson/.rbenv/versions/3.3.5/bin/ruby", "-r/home/watson/.rbenv/versions/3.3.5/lib/ruby/site_ruby/3.3.0/bundler/setup", "-Eascii-8bit:ascii-8bit", "bin/fluentd", "-c", "/home/watson/socket_unix.conf", "--workers", "1", "--under-supervisor"]
2024-11-02 09:45:01 +0900 [info]: #0 init worker0 logger path=nil rotate_age=nil rotate_size=nil
2024-11-02 09:45:01 +0900 [info]: adding match pattern="**" type="file"
2024-11-02 09:45:01 +0900 [info]: adding source type="unix"
2024-11-02 09:45:01 +0900 [warn]: #0 define <match fluent.**> to capture fluentd logs in top level is deprecated. Use <label @FLUENT_LOG> instead
2024-11-02 09:45:01 +0900 [info]: #0 starting fluentd worker pid=16185 ppid=16161 worker=0
2024-11-02 09:45:01 +0900 [warn]: #0 Found existing '/tmp/fluentd-unix.sock'. Remove this file for in_unix plugin
2024-11-02 09:45:01 +0900 [info]: #0 listening fluent socket on /tmp/fluentd-unix.sock
2024-11-02 09:45:01 +0900 [info]: #0 fluentd worker is now running worker=0
2024-11-02 09:45:04 +0900 [warn]: #0 incoming data is broken: msg=97
2024-11-02 09:45:04 +0900 [warn]: #0 incoming data is broken: msg=97
2024-11-02 09:45:04 +0900 [warn]: #0 incoming data is broken: msg=97
2024-11-02 09:45:04 +0900 [warn]: #0 incoming data is broken: msg=97
2024-11-02 09:45:04 +0900 [warn]: #0 incoming data is broken: msg=97
--- (snip) ---

Additional context

No response

@Watson1978 Watson1978 added waiting-for-triage bug Something isn't working and removed waiting-for-triage labels Nov 2, 2024
@Watson1978
Copy link
Contributor Author

Watson1978 commented Nov 2, 2024

The data received by the cool.io gem is delimited to 16384 bytes or less and passed to the application.
However, Fluentd might not handle it properly, I think.

require 'bundler/inline'
gemfile do
  source 'https://rubygems.org'
  gem 'cool.io'
end

require "socket"
require "json"
require 'fileutils'

UNIX_SOCKET_PATH = "/tmp/fluentd-unix.sock"
DATA_LENGTH = 1024 * 256

FileUtils.rm_f(UNIX_SOCKET_PATH)

class Client
  def data_generater
    d = ['test', Time.now.to_i, {"length_#{DATA_LENGTH}": "a" * DATA_LENGTH}]
    d.to_json
  end

  def run
    Thread.new do
      s = UNIXSocket.new(UNIX_SOCKET_PATH)
      sleep 0.5
      data = data_generater
      s.send(data, 0)
      puts "[client] sent size = #{data.size}"
      s.close

      puts "[client] Finished!!"
    end
  end
end

class ServerConnection < Cool.io::UNIXSocket
  def on_connect
    puts "[server] connected"
  end

  def on_close
    puts "[server] disconnected"

    exit
  end

  def on_read(data)
    puts "[server] received size = #{data.size}"
  end
end

server = Cool.io::UNIXServer.new(UNIX_SOCKET_PATH, ServerConnection)
server.attach(Cool.io::Loop.default)

Client.new.run

Cool.io::Loop.default.run
$ ruby coolio.rb
/home/watson/.rbenv/versions/3.3.5/lib/ruby/3.3.0/json/common.rb:3: warning: ostruct was loaded from the standard library, but will no longer be part of the default gems starting from Ruby 3.5.0.
You can add ostruct to your Gemfile or gemspec to silence this warning.
[server] connected
[client] sent size = 262184
[server] received size = 16384
[client] Finished!!
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 6272
[server] disconnected

@Watson1978
Copy link
Contributor Author

I guess the other plugins using cool.io gem have also same issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant