From ed12eed802542b4018d0ed816bfb57c9e226b5d2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sun, 1 Oct 2023 08:43:13 +0000 Subject: [PATCH 01/12] Bump actions/checkout from 3 to 4 Bumps [actions/checkout](https://github.com/actions/checkout) from 3 to 4. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v3...v4) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Signed-off-by: yongwoo.kim --- .github/workflows/linux.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index bd3a443..5b97ae7 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -15,7 +15,7 @@ jobs: - ubuntu-latest name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: ruby/setup-ruby@v1 with: ruby-version: ${{ matrix.ruby }} From 796d6c948d0ade76bed4d4702a352055062b9d45 Mon Sep 17 00:00:00 2001 From: "yongwoo.kim" Date: Fri, 18 Oct 2024 17:42:50 +0900 Subject: [PATCH 02/12] feat: Add Zstd compression support for S3 plugin Signed-off-by: yongwoo.kim --- fluent-plugin-s3.gemspec | 1 + lib/fluent/plugin/out_s3.rb | 26 ++++++++++++++++- lib/fluent/plugin/s3_compressor_zstd.rb | 37 +++++++++++++++++++++++++ test/test_in_s3.rb | 1 + 4 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 lib/fluent/plugin/s3_compressor_zstd.rb diff --git a/fluent-plugin-s3.gemspec b/fluent-plugin-s3.gemspec index 8088a41..6988745 100644 --- a/fluent-plugin-s3.gemspec +++ b/fluent-plugin-s3.gemspec @@ -26,4 +26,5 @@ Gem::Specification.new do |gem| # aws-sdk-core requires one of ox, oga, libxml, nokogiri or rexml, # and rexml is no longer default gem as of Ruby 3.0. gem.add_development_dependency "rexml" + gem.add_development_dependency 'zstd-ruby' end diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb index dc63f07..001bb4f 100644 --- a/lib/fluent/plugin/out_s3.rb +++ b/lib/fluent/plugin/out_s3.rb @@ -6,6 +6,7 @@ require 'time' require 'tempfile' require 'securerandom' +require 'zstd-ruby' module Fluent::Plugin class S3Output < Output @@ -630,6 +631,28 @@ def compress(chunk, tmp) end end + class ZstdCompressor < Compressor + def ext + 'zst'.freeze + end + + def content_type + 'application/x-zst'.freeze + end + + def compress(chunk, tmp) + uncompressed_data = '' + chunk.open do |io| + uncompressed_data = io.read + end + compressed_data = Zstd.compress(uncompressed_data, level: @level) + tmp.write(compressed_data) + rescue => e + log.warn "zstd compression failed: #{e.message}" + raise e + end + end + class TextCompressor < Compressor def ext 'txt'.freeze @@ -658,7 +681,8 @@ def content_type { 'gzip' => GzipCompressor, 'json' => JsonCompressor, - 'text' => TextCompressor + 'text' => TextCompressor, + 'zstd' => ZstdCompressor }.each { |name, compressor| COMPRESSOR_REGISTRY.register(name, compressor) } diff --git a/lib/fluent/plugin/s3_compressor_zstd.rb b/lib/fluent/plugin/s3_compressor_zstd.rb new file mode 100644 index 0000000..cb20a8c --- /dev/null +++ b/lib/fluent/plugin/s3_compressor_zstd.rb @@ -0,0 +1,37 @@ +require 'zstd-ruby' + +module Fluent::Plugin + class S3Output + class ZstdCompressor < Compressor + S3Output.register_compressor('zstd', self) + + config_param :level, :integer, default: 3, desc: "Compression level for zstd (1-22)" + + def initialize(opts = {}) + super() + @buffer_type = opts[:buffer_type] + @log = opts[:log] + end + + def ext + 'zst'.freeze + end + + def content_type + 'application/x-zstd'.freeze + end + + def compress(chunk, tmp) + uncompressed_data = '' + chunk.open do |io| + uncompressed_data = io.read + end + compressed_data = Zstd.compress(uncompressed_data, level: @level) + tmp.write(compressed_data) + rescue => e + log.warn "zstd compression failed: #{e.message}" + raise e + end + end + end +end diff --git a/test/test_in_s3.rb b/test/test_in_s3.rb index 2bd2d9d..dde048e 100644 --- a/test/test_in_s3.rb +++ b/test/test_in_s3.rb @@ -93,6 +93,7 @@ def test_unknown_store_as "text" => ["text", "txt", "text/plain"], "gzip" => ["gzip", "gz", "application/x-gzip"], "gzip_command" => ["gzip_command", "gz", "application/x-gzip"], + "zstd" => ["zstd", "zst", "application/x-zstd"], "lzo" => ["lzo", "lzo", "application/x-lzop"], "lzma2" => ["lzma2", "xz", "application/x-xz"]) def test_extractor(data) From 57ba441a78b25d384211f6222ac137cc89be12e7 Mon Sep 17 00:00:00 2001 From: ddukbg Date: Sat, 19 Oct 2024 10:43:35 +0900 Subject: [PATCH 03/12] Change zstd-ruby to runtime dependencies Signed-off-by: ddukbg --- fluent-plugin-s3.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluent-plugin-s3.gemspec b/fluent-plugin-s3.gemspec index 6988745..10ec13b 100644 --- a/fluent-plugin-s3.gemspec +++ b/fluent-plugin-s3.gemspec @@ -19,6 +19,7 @@ Gem::Specification.new do |gem| gem.add_dependency "fluentd", [">= 0.14.22", "< 2"] gem.add_dependency "aws-sdk-s3", "~> 1.60" gem.add_dependency "aws-sdk-sqs", "~> 1.23" + gem.add_dependency 'zstd-ruby' gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", ">= 1.0.3" @@ -26,5 +27,4 @@ Gem::Specification.new do |gem| # aws-sdk-core requires one of ox, oga, libxml, nokogiri or rexml, # and rexml is no longer default gem as of Ruby 3.0. gem.add_development_dependency "rexml" - gem.add_development_dependency 'zstd-ruby' end From 6f84c65fe4936a0e514e5a63b1723b1d49398b02 Mon Sep 17 00:00:00 2001 From: ddukbg <32587132+ddukbg@users.noreply.github.com> Date: Wed, 23 Oct 2024 16:49:58 +0900 Subject: [PATCH 04/12] Remove redundant bundler installation from GitHub Actions workflow Before: - name: Install dependencies run: gem install bundler rake After: - name: Install dependencies run: gem install rake Signed-off-by: ddukbg --- .github/workflows/linux.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 5b97ae7..6110f4b 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -23,6 +23,6 @@ jobs: env: CI: true run: | - gem install bundler rake + gem install rake bundle install --jobs 4 --retry 3 bundle exec rake test From 7f6e51c23863ba6bac53b8f11e76b16a73ff8623 Mon Sep 17 00:00:00 2001 From: ddukbg Date: Fri, 25 Oct 2024 18:42:24 +0900 Subject: [PATCH 05/12] Remove duplicate ZstdCompressor class as per maintainer's comments Signed-off-by: ddukbg --- lib/fluent/plugin/s3_compressor_zstd.rb | 37 ------------------------- 1 file changed, 37 deletions(-) delete mode 100644 lib/fluent/plugin/s3_compressor_zstd.rb diff --git a/lib/fluent/plugin/s3_compressor_zstd.rb b/lib/fluent/plugin/s3_compressor_zstd.rb deleted file mode 100644 index cb20a8c..0000000 --- a/lib/fluent/plugin/s3_compressor_zstd.rb +++ /dev/null @@ -1,37 +0,0 @@ -require 'zstd-ruby' - -module Fluent::Plugin - class S3Output - class ZstdCompressor < Compressor - S3Output.register_compressor('zstd', self) - - config_param :level, :integer, default: 3, desc: "Compression level for zstd (1-22)" - - def initialize(opts = {}) - super() - @buffer_type = opts[:buffer_type] - @log = opts[:log] - end - - def ext - 'zst'.freeze - end - - def content_type - 'application/x-zstd'.freeze - end - - def compress(chunk, tmp) - uncompressed_data = '' - chunk.open do |io| - uncompressed_data = io.read - end - compressed_data = Zstd.compress(uncompressed_data, level: @level) - tmp.write(compressed_data) - rescue => e - log.warn "zstd compression failed: #{e.message}" - raise e - end - end - end -end From 6aa84f43cfec7d706bb81ca8aea4aec4cfc20885 Mon Sep 17 00:00:00 2001 From: ddukbg Date: Fri, 25 Oct 2024 18:45:22 +0900 Subject: [PATCH 06/12] Remove ZstdCompressor tests from test_in_s3.rb as per maintainer's comments Moved ZstdCompressor tests from test_in_s3.rb to test_out_s3.rb as they relate to the out_s3 plugin. Signed-off-by: ddukbg --- test/test_in_s3.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/test/test_in_s3.rb b/test/test_in_s3.rb index dde048e..2bd2d9d 100644 --- a/test/test_in_s3.rb +++ b/test/test_in_s3.rb @@ -93,7 +93,6 @@ def test_unknown_store_as "text" => ["text", "txt", "text/plain"], "gzip" => ["gzip", "gz", "application/x-gzip"], "gzip_command" => ["gzip_command", "gz", "application/x-gzip"], - "zstd" => ["zstd", "zst", "application/x-zstd"], "lzo" => ["lzo", "lzo", "application/x-lzop"], "lzma2" => ["lzma2", "xz", "application/x-xz"]) def test_extractor(data) From dd3bafb3bc7f0c0e4cb9d7ac26d948efdef38414 Mon Sep 17 00:00:00 2001 From: ddukbg Date: Fri, 25 Oct 2024 18:45:56 +0900 Subject: [PATCH 07/12] Add ZstdCompressor test cases to test_out_s3.rb as per maintainer's comments Added tests for ZstdCompressor to test_out_s3.rb following the maintainer's suggestions. Signed-off-by: ddukbg --- test/test_out_s3.rb | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/test/test_out_s3.rb b/test/test_out_s3.rb index cbf7860..1f62b10 100644 --- a/test/test_out_s3.rb +++ b/test/test_out_s3.rb @@ -109,6 +109,14 @@ def test_configure_with_mime_type_lzo assert(e.is_a?(Fluent::ConfigError)) end + def test_configure_with_mime_type_zstd + conf = CONFIG.clone + conf << "\nstore_as zstd\n" + d = create_driver(conf) + assert_equal 'zst', d.instance.instance_variable_get(:@compressor).ext + assert_equal 'application/x-zst', d.instance.instance_variable_get(:@compressor).content_type + end + def test_configure_with_path_style conf = CONFIG.clone conf << "\nforce_path_style true\n" @@ -456,6 +464,33 @@ def test_write_with_custom_s3_object_key_format_containing_hex_random_placeholde FileUtils.rm_f(s3_local_file_path) end + def test_write_with_zstd + setup_mocks(true) + s3_local_file_path = "/tmp/s3-test.zst" + + expected_s3path = "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.zst" + + setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: expected_s3path) + + config = CONFIG_TIME_SLICE + "\nstore_as zstd\n" + d = create_time_sliced_driver(config) + + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: "test") do + d.feed(time, { "a" => 1 }) + d.feed(time, { "a" => 2 }) + end + + File.open(s3_local_file_path, 'rb') do |file| + compressed_data = file.read + uncompressed_data = Zstd.decompress(compressed_data) + expected_data = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + assert_equal expected_data, uncompressed_data + end + FileUtils.rm_f(s3_local_file_path) + end + class MockResponse attr_reader :data From 7e28a329c84b49ff44c40266b16ed24f3b354aa4 Mon Sep 17 00:00:00 2001 From: ddukbg <32587132+ddukbg@users.noreply.github.com> Date: Sat, 2 Nov 2024 17:26:00 +0900 Subject: [PATCH 08/12] refactor: Remove unnecessary whitespace Remove redundant spaces to improve code readability and consistency Co-authored-by: Daijiro Fukuda Signed-off-by: ddukbg refactor: Simplify data compression logic refactor: Simplify data compression logic Remove duplicate file reading and streamline compression process Co-authored-by: Daijiro Fukuda Signed-off-by: ddukbg --- lib/fluent/plugin/out_s3.rb | 6 +----- test/test_out_s3.rb | 10 +++++----- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb index 001bb4f..9149a0b 100644 --- a/lib/fluent/plugin/out_s3.rb +++ b/lib/fluent/plugin/out_s3.rb @@ -641,11 +641,7 @@ def content_type end def compress(chunk, tmp) - uncompressed_data = '' - chunk.open do |io| - uncompressed_data = io.read - end - compressed_data = Zstd.compress(uncompressed_data, level: @level) + compressed_data = Zstd.compress(chunk.read, level: @level) tmp.write(compressed_data) rescue => e log.warn "zstd compression failed: #{e.message}" diff --git a/test/test_out_s3.rb b/test/test_out_s3.rb index 1f62b10..59f753f 100644 --- a/test/test_out_s3.rb +++ b/test/test_out_s3.rb @@ -467,20 +467,20 @@ def test_write_with_custom_s3_object_key_format_containing_hex_random_placeholde def test_write_with_zstd setup_mocks(true) s3_local_file_path = "/tmp/s3-test.zst" - + expected_s3path = "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.zst" - + setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: expected_s3path) - + config = CONFIG_TIME_SLICE + "\nstore_as zstd\n" d = create_time_sliced_driver(config) - + time = event_time("2011-01-02 13:14:15 UTC") d.run(default_tag: "test") do d.feed(time, { "a" => 1 }) d.feed(time, { "a" => 2 }) end - + File.open(s3_local_file_path, 'rb') do |file| compressed_data = file.read uncompressed_data = Zstd.decompress(compressed_data) From 904a043b8d0228bcf6010c466da201833e801da5 Mon Sep 17 00:00:00 2001 From: ddukbg Date: Sat, 2 Nov 2024 20:33:52 +0900 Subject: [PATCH 09/12] fix: Add proper compression level handling to ZstdCompressor Signed-off-by: ddukbg --- lib/fluent/plugin/out_s3.rb | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb index 9149a0b..f0e0701 100644 --- a/lib/fluent/plugin/out_s3.rb +++ b/lib/fluent/plugin/out_s3.rb @@ -632,20 +632,32 @@ def compress(chunk, tmp) end class ZstdCompressor < Compressor + require 'zstd-ruby' + + DEFAULT_LEVEL = 3 + + def initialize(options = {}) + @level = (options.is_a?(Hash) && options[:level]) || DEFAULT_LEVEL + end + def ext 'zst'.freeze end - + def content_type 'application/x-zst'.freeze end - + def compress(chunk, tmp) - compressed_data = Zstd.compress(chunk.read, level: @level) - tmp.write(compressed_data) - rescue => e - log.warn "zstd compression failed: #{e.message}" - raise e + begin + data = chunk.read.gsub(/\r\n/, "\n").force_encoding('UTF-8') + compressed = Zstd.compress(data, level: @level) + tmp.binmode + tmp.write(compressed) + rescue => e + log.warn "zstd compression failed: #{e.message}" + raise e + end end end From e31e8894d592af0011a6889c8d633ec7d20332b8 Mon Sep 17 00:00:00 2001 From: ddukbg <32587132+ddukbg@users.noreply.github.com> Date: Wed, 6 Nov 2024 09:57:24 +0900 Subject: [PATCH 10/12] test: Add tests for Zstd compression level configuration Add test cases to verify: - Default compression level (3) - Custom compression level setting Co-authored-by: Daijiro Fukuda Signed-off-by: ddukbg --- test/test_out_s3.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/test_out_s3.rb b/test/test_out_s3.rb index 59f753f..2d7941d 100644 --- a/test/test_out_s3.rb +++ b/test/test_out_s3.rb @@ -109,12 +109,16 @@ def test_configure_with_mime_type_lzo assert(e.is_a?(Fluent::ConfigError)) end - def test_configure_with_mime_type_zstd + data('level default' => nil, + 'level 1' => 1) + def test_configure_with_mime_type_zstd(level) conf = CONFIG.clone conf << "\nstore_as zstd\n" + conf << "\n\nlevel #{level}\n\n" if level d = create_driver(conf) assert_equal 'zst', d.instance.instance_variable_get(:@compressor).ext assert_equal 'application/x-zst', d.instance.instance_variable_get(:@compressor).content_type + assert_equal (level || 3), d.instance.instance_variable_get(:@compressor).instance_variable_get(:@compress_config).level end def test_configure_with_path_style From fa2a609090f285e97cdc05ad7ad83712f1d333f6 Mon Sep 17 00:00:00 2001 From: ddukbg Date: Wed, 6 Nov 2024 11:54:40 +0900 Subject: [PATCH 11/12] fix: Improve ZSTD compression implementation - Remove unnecessary line ending and encoding handling - Fix compression logic using chunk.write_to method - Properly implement ZstdCompressor in separate file The previous implementation mistakenly included explicit line ending and encoding handling during troubleshooting. This has been removed as the proper implementation using chunk.write_to handles the data correctly without such manipulation. Signed-off-by: ddukbg Signed-off-by: ddukbg --- lib/fluent/plugin/out_s3.rb | 34 +---------------------- lib/fluent/plugin/s3_compressor_zstd.rb | 36 +++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 33 deletions(-) create mode 100644 lib/fluent/plugin/s3_compressor_zstd.rb diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb index f0e0701..dc63f07 100644 --- a/lib/fluent/plugin/out_s3.rb +++ b/lib/fluent/plugin/out_s3.rb @@ -6,7 +6,6 @@ require 'time' require 'tempfile' require 'securerandom' -require 'zstd-ruby' module Fluent::Plugin class S3Output < Output @@ -631,36 +630,6 @@ def compress(chunk, tmp) end end - class ZstdCompressor < Compressor - require 'zstd-ruby' - - DEFAULT_LEVEL = 3 - - def initialize(options = {}) - @level = (options.is_a?(Hash) && options[:level]) || DEFAULT_LEVEL - end - - def ext - 'zst'.freeze - end - - def content_type - 'application/x-zst'.freeze - end - - def compress(chunk, tmp) - begin - data = chunk.read.gsub(/\r\n/, "\n").force_encoding('UTF-8') - compressed = Zstd.compress(data, level: @level) - tmp.binmode - tmp.write(compressed) - rescue => e - log.warn "zstd compression failed: #{e.message}" - raise e - end - end - end - class TextCompressor < Compressor def ext 'txt'.freeze @@ -689,8 +658,7 @@ def content_type { 'gzip' => GzipCompressor, 'json' => JsonCompressor, - 'text' => TextCompressor, - 'zstd' => ZstdCompressor + 'text' => TextCompressor }.each { |name, compressor| COMPRESSOR_REGISTRY.register(name, compressor) } diff --git a/lib/fluent/plugin/s3_compressor_zstd.rb b/lib/fluent/plugin/s3_compressor_zstd.rb new file mode 100644 index 0000000..70166ac --- /dev/null +++ b/lib/fluent/plugin/s3_compressor_zstd.rb @@ -0,0 +1,36 @@ +require 'zstd-ruby' + +module Fluent::Plugin + class S3Output + class ZstdCompressor < Compressor + S3Output.register_compressor('zstd', self) + + config_section :compress, param_name: :compress_config, init: true, multi: false do + desc "Compression level for zstd (1-22)" + config_param :level, :integer, default: 3 + end + + def ext + 'zst'.freeze + end + + def content_type + 'application/x-zst'.freeze + end + + def compress(chunk, tmp) + w = StringIO.new + chunk.write_to(w) + w.rewind + compressed = Zstd.compress(w.read, level: @compress_config.level) + tmp.binmode + tmp.rewind + tmp.write(compressed) + tmp.rewind + rescue => e + log.warn "zstd compression failed: #{e.message}" + raise + end + end + end +end \ No newline at end of file From 08448a1551950af7c338fd06b20e8a63f3ffa88c Mon Sep 17 00:00:00 2001 From: ddukbg <32587132+ddukbg@users.noreply.github.com> Date: Wed, 6 Nov 2024 14:57:23 +0900 Subject: [PATCH 12/12] refactor: Simplify ZSTD compression implementation Use direct chunk.read method instead of intermediate StringIO. This simplification maintains the same functionality while making the code more straightforward and easier to understand. Co-authored-by: Daijiro Fukuda Signed-off-by: ddukbg --- lib/fluent/plugin/s3_compressor_zstd.rb | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/fluent/plugin/s3_compressor_zstd.rb b/lib/fluent/plugin/s3_compressor_zstd.rb index 70166ac..cc8540d 100644 --- a/lib/fluent/plugin/s3_compressor_zstd.rb +++ b/lib/fluent/plugin/s3_compressor_zstd.rb @@ -19,14 +19,8 @@ def content_type end def compress(chunk, tmp) - w = StringIO.new - chunk.write_to(w) - w.rewind - compressed = Zstd.compress(w.read, level: @compress_config.level) - tmp.binmode - tmp.rewind + compressed = Zstd.compress(chunk.read, level: @compress_config.level) tmp.write(compressed) - tmp.rewind rescue => e log.warn "zstd compression failed: #{e.message}" raise