Skip to content

Commit

Permalink
feat: Add Zstd compression support for S3 plugin
Browse files Browse the repository at this point in the history
Signed-off-by: yongwoo.kim <[email protected]>
  • Loading branch information
yongwoo.kim committed Oct 18, 2024
1 parent 84f42a9 commit 6af3b5d
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 1 deletion.
1 change: 1 addition & 0 deletions fluent-plugin-s3.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ Gem::Specification.new do |gem|
# aws-sdk-core requires one of ox, oga, libxml, nokogiri or rexml,
# and rexml is no longer default gem as of Ruby 3.0.
gem.add_development_dependency "rexml"
gem.add_development_dependency 'zstd-ruby'
end
26 changes: 25 additions & 1 deletion lib/fluent/plugin/out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'time'
require 'tempfile'
require 'securerandom'
require 'zstd-ruby'

module Fluent::Plugin
class S3Output < Output
Expand Down Expand Up @@ -630,6 +631,28 @@ def compress(chunk, tmp)
end
end

class ZstdCompressor < Compressor
def ext
'zst'.freeze
end

def content_type
'application/x-zst'.freeze
end

def compress(chunk, tmp)
uncompressed_data = ''
chunk.open do |io|
uncompressed_data = io.read
end
compressed_data = Zstd.compress(uncompressed_data, level: @level)
tmp.write(compressed_data)
rescue => e
log.warn "zstd compression failed: #{e.message}"
raise e
end
end

class TextCompressor < Compressor
def ext
'txt'.freeze
Expand Down Expand Up @@ -658,7 +681,8 @@ def content_type
{
'gzip' => GzipCompressor,
'json' => JsonCompressor,
'text' => TextCompressor
'text' => TextCompressor,
'zstd' => ZstdCompressor
}.each { |name, compressor|
COMPRESSOR_REGISTRY.register(name, compressor)
}
Expand Down
37 changes: 37 additions & 0 deletions lib/fluent/plugin/s3_compressor_zstd.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
require 'zstd-ruby'

module Fluent::Plugin
class S3Output
class ZstdCompressor < Compressor
S3Output.register_compressor('zstd', self)

config_param :level, :integer, default: 3, desc: "Compression level for zstd (1-22)"

def initialize(opts = {})
super()
@buffer_type = opts[:buffer_type]
@log = opts[:log]
end

def ext
'zst'.freeze
end

def content_type
'application/x-zstd'.freeze
end

def compress(chunk, tmp)
uncompressed_data = ''
chunk.open do |io|
uncompressed_data = io.read
end
compressed_data = Zstd.compress(uncompressed_data, level: @level)
tmp.write(compressed_data)
rescue => e
log.warn "zstd compression failed: #{e.message}"
raise e
end
end
end
end
1 change: 1 addition & 0 deletions test/test_in_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def test_unknown_store_as
"text" => ["text", "txt", "text/plain"],
"gzip" => ["gzip", "gz", "application/x-gzip"],
"gzip_command" => ["gzip_command", "gz", "application/x-gzip"],
"zstd" => ["zstd", "zst", "application/x-zstd"],
"lzo" => ["lzo", "lzo", "application/x-lzop"],
"lzma2" => ["lzma2", "xz", "application/x-xz"])
def test_extractor(data)
Expand Down

0 comments on commit 6af3b5d

Please sign in to comment.