From 796d6c948d0ade76bed4d4702a352055062b9d45 Mon Sep 17 00:00:00 2001 From: "yongwoo.kim" Date: Fri, 18 Oct 2024 17:42:50 +0900 Subject: [PATCH] feat: Add Zstd compression support for S3 plugin Signed-off-by: yongwoo.kim --- fluent-plugin-s3.gemspec | 1 + lib/fluent/plugin/out_s3.rb | 26 ++++++++++++++++- lib/fluent/plugin/s3_compressor_zstd.rb | 37 +++++++++++++++++++++++++ test/test_in_s3.rb | 1 + 4 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 lib/fluent/plugin/s3_compressor_zstd.rb diff --git a/fluent-plugin-s3.gemspec b/fluent-plugin-s3.gemspec index 8088a41..6988745 100644 --- a/fluent-plugin-s3.gemspec +++ b/fluent-plugin-s3.gemspec @@ -26,4 +26,5 @@ Gem::Specification.new do |gem| # aws-sdk-core requires one of ox, oga, libxml, nokogiri or rexml, # and rexml is no longer default gem as of Ruby 3.0. gem.add_development_dependency "rexml" + gem.add_development_dependency 'zstd-ruby' end diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb index dc63f07..001bb4f 100644 --- a/lib/fluent/plugin/out_s3.rb +++ b/lib/fluent/plugin/out_s3.rb @@ -6,6 +6,7 @@ require 'time' require 'tempfile' require 'securerandom' +require 'zstd-ruby' module Fluent::Plugin class S3Output < Output @@ -630,6 +631,28 @@ def compress(chunk, tmp) end end + class ZstdCompressor < Compressor + def ext + 'zst'.freeze + end + + def content_type + 'application/x-zst'.freeze + end + + def compress(chunk, tmp) + uncompressed_data = '' + chunk.open do |io| + uncompressed_data = io.read + end + compressed_data = Zstd.compress(uncompressed_data, level: @level) + tmp.write(compressed_data) + rescue => e + log.warn "zstd compression failed: #{e.message}" + raise e + end + end + class TextCompressor < Compressor def ext 'txt'.freeze @@ -658,7 +681,8 @@ def content_type { 'gzip' => GzipCompressor, 'json' => JsonCompressor, - 'text' => TextCompressor + 'text' => TextCompressor, + 'zstd' => ZstdCompressor }.each { |name, compressor| COMPRESSOR_REGISTRY.register(name, compressor) } diff --git a/lib/fluent/plugin/s3_compressor_zstd.rb b/lib/fluent/plugin/s3_compressor_zstd.rb new file mode 100644 index 0000000..cb20a8c --- /dev/null +++ b/lib/fluent/plugin/s3_compressor_zstd.rb @@ -0,0 +1,37 @@ +require 'zstd-ruby' + +module Fluent::Plugin + class S3Output + class ZstdCompressor < Compressor + S3Output.register_compressor('zstd', self) + + config_param :level, :integer, default: 3, desc: "Compression level for zstd (1-22)" + + def initialize(opts = {}) + super() + @buffer_type = opts[:buffer_type] + @log = opts[:log] + end + + def ext + 'zst'.freeze + end + + def content_type + 'application/x-zstd'.freeze + end + + def compress(chunk, tmp) + uncompressed_data = '' + chunk.open do |io| + uncompressed_data = io.read + end + compressed_data = Zstd.compress(uncompressed_data, level: @level) + tmp.write(compressed_data) + rescue => e + log.warn "zstd compression failed: #{e.message}" + raise e + end + end + end +end diff --git a/test/test_in_s3.rb b/test/test_in_s3.rb index 2bd2d9d..dde048e 100644 --- a/test/test_in_s3.rb +++ b/test/test_in_s3.rb @@ -93,6 +93,7 @@ def test_unknown_store_as "text" => ["text", "txt", "text/plain"], "gzip" => ["gzip", "gz", "application/x-gzip"], "gzip_command" => ["gzip_command", "gz", "application/x-gzip"], + "zstd" => ["zstd", "zst", "application/x-zstd"], "lzo" => ["lzo", "lzo", "application/x-lzop"], "lzma2" => ["lzma2", "xz", "application/x-xz"]) def test_extractor(data)