Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Zstd compression support to S3 plugin #439

Merged
merged 13 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ jobs:
- ubuntu-latest
name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
- name: unit testing
env:
CI: true
run: |
gem install bundler rake
gem install rake
bundle install --jobs 4 --retry 3
bundle exec rake test
1 change: 1 addition & 0 deletions fluent-plugin-s3.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |gem|
gem.add_dependency "fluentd", [">= 0.14.22", "< 2"]
gem.add_dependency "aws-sdk-s3", "~> 1.60"
gem.add_dependency "aws-sdk-sqs", "~> 1.23"
gem.add_dependency 'zstd-ruby'
gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
gem.add_development_dependency "test-unit-rr", ">= 1.0.3"
Expand Down
26 changes: 25 additions & 1 deletion lib/fluent/plugin/out_s3.rb
daipom marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'time'
require 'tempfile'
require 'securerandom'
require 'zstd-ruby'

module Fluent::Plugin
class S3Output < Output
Expand Down Expand Up @@ -630,6 +631,28 @@ def compress(chunk, tmp)
end
end

class ZstdCompressor < Compressor
def ext
'zst'.freeze
end

def content_type
'application/x-zst'.freeze
end

def compress(chunk, tmp)
uncompressed_data = ''
chunk.open do |io|
uncompressed_data = io.read
end
compressed_data = Zstd.compress(uncompressed_data, level: @level)
daipom marked this conversation as resolved.
Show resolved Hide resolved
ddukbg marked this conversation as resolved.
Show resolved Hide resolved
tmp.write(compressed_data)
rescue => e
log.warn "zstd compression failed: #{e.message}"
raise e
end
end

class TextCompressor < Compressor
def ext
'txt'.freeze
Expand Down Expand Up @@ -658,7 +681,8 @@ def content_type
{
'gzip' => GzipCompressor,
'json' => JsonCompressor,
'text' => TextCompressor
'text' => TextCompressor,
'zstd' => ZstdCompressor
}.each { |name, compressor|
COMPRESSOR_REGISTRY.register(name, compressor)
}
Expand Down
35 changes: 35 additions & 0 deletions test/test_out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ def test_configure_with_mime_type_lzo
assert(e.is_a?(Fluent::ConfigError))
end

def test_configure_with_mime_type_zstd
conf = CONFIG.clone
conf << "\nstore_as zstd\n"
d = create_driver(conf)
assert_equal 'zst', d.instance.instance_variable_get(:@compressor).ext
assert_equal 'application/x-zst', d.instance.instance_variable_get(:@compressor).content_type
end
ddukbg marked this conversation as resolved.
Show resolved Hide resolved

def test_configure_with_path_style
conf = CONFIG.clone
conf << "\nforce_path_style true\n"
Expand Down Expand Up @@ -456,6 +464,33 @@ def test_write_with_custom_s3_object_key_format_containing_hex_random_placeholde
FileUtils.rm_f(s3_local_file_path)
end

def test_write_with_zstd
setup_mocks(true)
s3_local_file_path = "/tmp/s3-test.zst"

expected_s3path = "log/events/ts=20110102-13/events_0-#{Socket.gethostname}.zst"

setup_s3_object_mocks(s3_local_file_path: s3_local_file_path, s3path: expected_s3path)

config = CONFIG_TIME_SLICE + "\nstore_as zstd\n"
d = create_time_sliced_driver(config)

time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: "test") do
d.feed(time, { "a" => 1 })
d.feed(time, { "a" => 2 })
end

File.open(s3_local_file_path, 'rb') do |file|
compressed_data = file.read
uncompressed_data = Zstd.decompress(compressed_data)
expected_data = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] +
%[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]
assert_equal expected_data, uncompressed_data
end
FileUtils.rm_f(s3_local_file_path)
end
ddukbg marked this conversation as resolved.
Show resolved Hide resolved

class MockResponse
attr_reader :data

Expand Down