Skip to content

Commit

Permalink
Merge pull request #226 from rudolf-focuscura/fix-input-gzip-multiple…
Browse files Browse the repository at this point in the history
…-streams

in_s3 - gzip - Fixed extracting multiple streams
  • Loading branch information
repeatedly authored Apr 15, 2018
2 parents 2db2ca1 + 26cd084 commit 120bf44
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 2 deletions.
17 changes: 15 additions & 2 deletions lib/fluent/plugin/in_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,23 @@ def content_type
'application/x-gzip'.freeze
end

# https://bugs.ruby-lang.org/issues/9790
# https://bugs.ruby-lang.org/issues/11180
# https://github.com/exAspArk/multiple_files_gzip_reader
def extract(io)
Zlib::GzipReader.wrap(io) do |gz|
gz.read
parts = []
loop do
unused = nil
Zlib::GzipReader.wrap(io) do |gz|
parts << gz.read
unused = gz.unused
gz.finish
end
io.pos -= unused ? unused.length : 0
break if io.eof?
end
io.close
parts.join
end
end

Expand Down
101 changes: 101 additions & 0 deletions test/test_in_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def test_unknown_store_as

data("json" => ["json", "json", "application/json"],
"text" => ["text", "txt", "text/plain"],
"gzip" => ["gzip", "gz", "application/x-gzip"],
"gzip_command" => ["gzip_command", "gz", "application/x-gzip"],
"lzo" => ["lzo", "lzo", "application/x-lzop"],
"lzma2" => ["lzma2", "xz", "application/x-xz"])
Expand Down Expand Up @@ -262,4 +263,104 @@ def test_one_record_multi_line
]
assert_equal(expected_records, events.map {|_tag, _time, record| record })
end

def test_gzip_single_stream
setup_mocks
d = create_driver(CONFIG + "\ncheck_apikey_on_start false\nstore_as gzip\nformat none\n")

s3_object = stub(Object.new)
s3_response = stub(Object.new)
s3_response.body {
io = StringIO.new
Zlib::GzipWriter.wrap(io) do |gz|
gz.write "aaa\nbbb\n"
gz.finish
end
io.rewind
io
}
s3_object.get { s3_response }
@s3_bucket.object(anything).at_least(1) { s3_object }

body = {
"Records" => [
{
"s3" => {
"object" => {
"key" => "test_key"
}
}
}
]
}
message = Struct::StubMessage.new(1, 1, Yajl.dump(body))
@sqs_poller.get_messages(anything, anything) do |config, stats|
config.before_request.call(stats) if config.before_request
stats.request_count += 1
if stats.request_count >= 1
d.instance.instance_variable_set(:@running, false)
end
[message]
end
d.run(expect_emits: 1)
events = d.events
expected_records = [
{ "message" => "aaa\n" },
{ "message" => "bbb\n" }
]
assert_equal(expected_records, events.map {|_tag, _time, record| record })
end

def test_gzip_multiple_steams
setup_mocks
d = create_driver(CONFIG + "\ncheck_apikey_on_start false\nstore_as gzip\nformat none\n")

s3_object = stub(Object.new)
s3_response = stub(Object.new)
s3_response.body {
io = StringIO.new
Zlib::GzipWriter.wrap(io) do |gz|
gz.write "aaa\nbbb\n"
gz.finish
end
Zlib::GzipWriter.wrap(io) do |gz|
gz.write "ccc\nddd\n"
gz.finish
end
io.rewind
io
}
s3_object.get { s3_response }
@s3_bucket.object(anything).at_least(1) { s3_object }

body = {
"Records" => [
{
"s3" => {
"object" => {
"key" => "test_key"
}
}
}
]
}
message = Struct::StubMessage.new(1, 1, Yajl.dump(body))
@sqs_poller.get_messages(anything, anything) do |config, stats|
config.before_request.call(stats) if config.before_request
stats.request_count += 1
if stats.request_count >= 1
d.instance.instance_variable_set(:@running, false)
end
[message]
end
d.run(expect_emits: 1)
events = d.events
expected_records = [
{ "message" => "aaa\n" },
{ "message" => "bbb\n" },
{ "message" => "ccc\n" },
{ "message" => "ddd\n" }
]
assert_equal(expected_records, events.map {|_tag, _time, record| record })
end
end

0 comments on commit 120bf44

Please sign in to comment.