Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Zstd compression support to S3 plugin #439

Merged
merged 13 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ jobs:
- ubuntu-latest
name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
- name: unit testing
env:
CI: true
run: |
gem install bundler rake
gem install rake
bundle install --jobs 4 --retry 3
bundle exec rake test
1 change: 1 addition & 0 deletions fluent-plugin-s3.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |gem|
gem.add_dependency "fluentd", [">= 0.14.22", "< 2"]
gem.add_dependency "aws-sdk-s3", "~> 1.60"
gem.add_dependency "aws-sdk-sqs", "~> 1.23"
gem.add_dependency 'zstd-ruby'
gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
gem.add_development_dependency "test-unit-rr", ">= 1.0.3"
Expand Down
26 changes: 25 additions & 1 deletion lib/fluent/plugin/out_s3.rb
daipom marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'time'
require 'tempfile'
require 'securerandom'
require 'zstd-ruby'

module Fluent::Plugin
class S3Output < Output
Expand Down Expand Up @@ -630,6 +631,28 @@ def compress(chunk, tmp)
end
end

class ZstdCompressor < Compressor
def ext
'zst'.freeze
end

def content_type
'application/x-zst'.freeze
end

def compress(chunk, tmp)
uncompressed_data = ''
chunk.open do |io|
uncompressed_data = io.read
end
compressed_data = Zstd.compress(uncompressed_data, level: @level)
daipom marked this conversation as resolved.
Show resolved Hide resolved
ddukbg marked this conversation as resolved.
Show resolved Hide resolved
tmp.write(compressed_data)
rescue => e
log.warn "zstd compression failed: #{e.message}"
raise e
end
end

class TextCompressor < Compressor
def ext
'txt'.freeze
Expand Down Expand Up @@ -658,7 +681,8 @@ def content_type
{
'gzip' => GzipCompressor,
'json' => JsonCompressor,
'text' => TextCompressor
'text' => TextCompressor,
'zstd' => ZstdCompressor
}.each { |name, compressor|
COMPRESSOR_REGISTRY.register(name, compressor)
}
Expand Down
37 changes: 37 additions & 0 deletions lib/fluent/plugin/s3_compressor_zstd.rb
ddukbg marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
require 'zstd-ruby'

module Fluent::Plugin
class S3Output
class ZstdCompressor < Compressor
S3Output.register_compressor('zstd', self)

config_param :level, :integer, default: 3, desc: "Compression level for zstd (1-22)"

def initialize(opts = {})
super()
@buffer_type = opts[:buffer_type]
@log = opts[:log]
end

def ext
'zst'.freeze
end

def content_type
'application/x-zstd'.freeze
end

def compress(chunk, tmp)
uncompressed_data = ''
chunk.open do |io|
uncompressed_data = io.read
end
compressed_data = Zstd.compress(uncompressed_data, level: @level)
tmp.write(compressed_data)
rescue => e
log.warn "zstd compression failed: #{e.message}"
raise e
end
end
end
end
1 change: 1 addition & 0 deletions test/test_in_s3.rb
ddukbg marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def test_unknown_store_as
"text" => ["text", "txt", "text/plain"],
"gzip" => ["gzip", "gz", "application/x-gzip"],
"gzip_command" => ["gzip_command", "gz", "application/x-gzip"],
"zstd" => ["zstd", "zst", "application/x-zstd"],
"lzo" => ["lzo", "lzo", "application/x-lzop"],
"lzma2" => ["lzma2", "xz", "application/x-xz"])
def test_extractor(data)
Expand Down