Skip to content

Commit

Permalink
fix: Improve ZSTD compression implementation - Remove unnecessary lin…
Browse files Browse the repository at this point in the history
…e ending and encoding handling - Fix compression logic using chunk.write_to method - Properly implement ZstdCompressor in separate file The previous implementation mistakenly included explicit line ending and encoding handling during troubleshooting. This has been removed as the proper implementation using chunk.write_to handles the data correctly without such manipulation. Signed-off-by: ddukbg <[email protected]>

Signed-off-by: ddukbg <[email protected]>
  • Loading branch information
ddukbg committed Nov 6, 2024
1 parent e31e889 commit fa2a609
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 33 deletions.
34 changes: 1 addition & 33 deletions lib/fluent/plugin/out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
require 'time'
require 'tempfile'
require 'securerandom'
require 'zstd-ruby'

module Fluent::Plugin
class S3Output < Output
Expand Down Expand Up @@ -631,36 +630,6 @@ def compress(chunk, tmp)
end
end

class ZstdCompressor < Compressor
require 'zstd-ruby'

DEFAULT_LEVEL = 3

def initialize(options = {})
@level = (options.is_a?(Hash) && options[:level]) || DEFAULT_LEVEL
end

def ext
'zst'.freeze
end

def content_type
'application/x-zst'.freeze
end

def compress(chunk, tmp)
begin
data = chunk.read.gsub(/\r\n/, "\n").force_encoding('UTF-8')
compressed = Zstd.compress(data, level: @level)
tmp.binmode
tmp.write(compressed)
rescue => e
log.warn "zstd compression failed: #{e.message}"
raise e
end
end
end

class TextCompressor < Compressor
def ext
'txt'.freeze
Expand Down Expand Up @@ -689,8 +658,7 @@ def content_type
{
'gzip' => GzipCompressor,
'json' => JsonCompressor,
'text' => TextCompressor,
'zstd' => ZstdCompressor
'text' => TextCompressor
}.each { |name, compressor|
COMPRESSOR_REGISTRY.register(name, compressor)
}
Expand Down
36 changes: 36 additions & 0 deletions lib/fluent/plugin/s3_compressor_zstd.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
require 'zstd-ruby'

module Fluent::Plugin
class S3Output
class ZstdCompressor < Compressor
S3Output.register_compressor('zstd', self)

config_section :compress, param_name: :compress_config, init: true, multi: false do
desc "Compression level for zstd (1-22)"
config_param :level, :integer, default: 3
end

def ext
'zst'.freeze
end

def content_type
'application/x-zst'.freeze
end

def compress(chunk, tmp)
w = StringIO.new
chunk.write_to(w)
w.rewind
compressed = Zstd.compress(w.read, level: @compress_config.level)
tmp.binmode
tmp.rewind
tmp.write(compressed)
tmp.rewind
rescue => e
log.warn "zstd compression failed: #{e.message}"
raise
end
end
end
end

0 comments on commit fa2a609

Please sign in to comment.