Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Zstd compression support to S3 plugin #439

Merged
merged 13 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ jobs:
- ubuntu-latest
name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
- name: unit testing
env:
CI: true
run: |
gem install bundler rake
gem install rake
bundle install --jobs 4 --retry 3
bundle exec rake test
1 change: 1 addition & 0 deletions fluent-plugin-s3.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |gem|
gem.add_dependency "fluentd", [">= 0.14.22", "< 2"]
gem.add_dependency "aws-sdk-s3", "~> 1.60"
gem.add_dependency "aws-sdk-sqs", "~> 1.23"
gem.add_dependency 'zstd-ruby'
gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
gem.add_development_dependency "test-unit-rr", ">= 1.0.3"
Expand Down
36 changes: 36 additions & 0 deletions lib/fluent/plugin/s3_compressor_zstd.rb
ddukbg marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
require 'zstd-ruby'

module Fluent::Plugin
class S3Output
class ZstdCompressor < Compressor
S3Output.register_compressor('zstd', self)

config_section :compress, param_name: :compress_config, init: true, multi: false do
desc "Compression level for zstd (1-22)"
config_param :level, :integer, default: 3
end

def ext
'zst'.freeze
end

def content_type
'application/x-zst'.freeze
end

def compress(chunk, tmp)
w = StringIO.new
chunk.write_to(w)
w.rewind
compressed = Zstd.compress(w.read, level: @compress_config.level)
tmp.binmode
tmp.rewind
tmp.write(compressed)
tmp.rewind
ddukbg marked this conversation as resolved.
Show resolved Hide resolved
rescue => e
log.warn "zstd compression failed: #{e.message}"
raise
end
end
end
end
39 changes: 39 additions & 0 deletions test/test_out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ def test_configure_with_mime_type_lzo
assert(e.is_a?(Fluent::ConfigError))
end

data('level default' => nil,
'level 1' => 1)
def test_configure_with_mime_type_zstd(level)
conf = CONFIG.clone
conf << "\nstore_as zstd\n"
conf << "\n<compress>\nlevel #{level}\n</compress>\n" if level
d = create_driver(conf)
assert_equal 'zst', d.instance.instance_variable_get(:@compressor).ext
assert_equal 'application/x-zst', d.instance.instance_variable_get(:@compressor).content_type
assert_equal (level || 3), d.instance.instance_variable_get(:@compressor).instance_variable_get(:@compress_config).level
end

def test_configure_with_path_style
conf = CONFIG.clone
conf << "\nforce_path_style true\n"
Expand Down Expand Up @@ -456,6 +468,33 @@ def test_write_with_custom_s3_object_key_format_containing_hex_random_placeholde
FileUtils.rm_f(s3_local_file_path)
end

def test_write_with_zstd
setup_mocks(true)
s3_local_file_path = "/tmp/s3-test.zst"

expected_s3path = "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.zst"

setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: expected_s3path)

config = CONFIG_TIME_SLICE + "\nstore_as zstd\n"
d = create_time_sliced_driver(config)

time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: "test") do
d.feed(time, { "a" => 1 })
d.feed(time, { "a" => 2 })
end

File.open(s3_local_file_path, 'rb') do |file|
compressed_data = file.read
uncompressed_data = Zstd.decompress(compressed_data)
expected_data = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
%[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]
assert_equal expected_data, uncompressed_data
end
FileUtils.rm_f(s3_local_file_path)
end

class MockResponse
attr_reader :data

Expand Down