Skip to content

Commit

Permalink
Added: support for grouped timeout per group of patterns
Browse files Browse the repository at this point in the history
Fixes #153
  • Loading branch information
kares committed Nov 18, 2019
1 parent 3527b14 commit d4aac7c
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 22 deletions.
47 changes: 26 additions & 21 deletions lib/logstash/filters/grok.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,14 @@
# `SYSLOGBASE` pattern which itself is defined by other patterns.
#
# Another option is to define patterns _inline_ in the filter using `pattern_definitions`.
# This is mostly for convenience and allows user to define a pattern which can be used just in that
# This is mostly for convenience and allows user to define a pattern which can be used just in that
# filter. This newly defined patterns in `pattern_definitions` will not be available outside of that particular `grok` filter.
#
class LogStash::Filters::Grok < LogStash::Filters::Base
require 'logstash/filters/grok/timeout_support'

include TimeoutSupport

config_name "grok"

# A hash of matches of field => value
Expand All @@ -168,7 +172,7 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
# necessarily need to define this yourself unless you are adding additional
# patterns. You can point to multiple pattern directories using this setting.
# Note that Grok will read all files in the directory matching the patterns_files_glob
# and assume it's a pattern file (including any tilde backup files).
# and assume it's a pattern file (including any tilde backup files).
# [source,ruby]
# patterns_dir => ["/opt/logstash/patterns", "/opt/logstash/extra_patterns"]
#
Expand Down Expand Up @@ -215,6 +219,12 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
# Set to 0 to disable timeouts
config :timeout_millis, :validate => :number, :default => 30000

# Group timeout for multiple patterns instead of having a timeout per pattern.
# When set to true timeout_millis effectively becomes a timeout over several patterns.
# Default is false due backwards compatibility.
# Has only an effect when timeout_millis > 0
config :timeout_grouped, :validate => :boolean, :default => false

# Tag to apply if a grok regexp times out.
config :tag_on_timeout, :validate => :string, :default => '_groktimeout'

Expand Down Expand Up @@ -278,10 +288,7 @@ def register
@match_counter = metric.counter(:matches)
@failure_counter = metric.counter(:failures)

# divide by float to allow fractionnal seconds, the Timeout class timeout value is in seconds but the underlying
# executor resolution is in microseconds so fractionnal second parameter down to microseconds is possible.
# see https://github.com/jruby/jruby/blob/9.2.7.0/core/src/main/java/org/jruby/ext/timeout/Timeout.java#L125
@timeout_seconds = @timeout_millis / 1000.0
@timeout = @timeout_millis > 0.0 ? RubyTimeout.new(@timeout_millis) : NoopTimeout.new
end # def register

def filter(event)
Expand Down Expand Up @@ -334,26 +341,24 @@ def match(groks, field, event)
end

def match_against_groks(groks, field, input, event)
# Convert anything else to string (number, hash, etc)
input = input.to_s
matched = false
groks.each do |grok|
# Convert anything else to string (number, hash, etc)
matched = grok_till_timeout(grok, field, input)
if matched
grok.capture(matched) {|field, value| handle(field, value, event)}
break if @break_on_match

context = GrokContext.new
with_timeout_if(@timeout_grouped, context) do
groks.each do |grok|
context.update(grok, field, input)

matched = with_timeout_if(!@timeout_grouped, context) { grok.execute(input) }
if matched
grok.capture(matched) { |field, value| handle(field, value, event) }
break if @break_on_match
end
end
end

matched
end

def grok_till_timeout(grok, field, value)
begin
@timeout_seconds > 0.0 ? Timeout.timeout(@timeout_seconds, TimeoutError) { grok.execute(value) } : grok.execute(value)
rescue TimeoutError
raise GrokTimeoutException.new(grok, field, value)
end
matched
end

def handle(field, value, event)
Expand Down
62 changes: 62 additions & 0 deletions lib/logstash/filters/grok/timeout_support.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# encoding: utf-8

require 'timeout'

# NOTE: use nested module structure so that `TimeoutError` is picked from parent (`Grok`).
module LogStash module Filters class Grok
# Helper module to manage the timeout helper pieces.
module TimeoutSupport

def with_timeout_if(condition, context, &block)
if condition
with_timeout(context, &block)
else
yield
end
end

def with_timeout(context, &block)
@timeout.exec(&block)
rescue TimeoutError => error
handle_timeout(context, error)
end

def handle_timeout(context, error)
raise GrokTimeoutException.new(context.grok, context.field, context.input)
end

GrokContext = Struct.new(:grok, :field, :input) do

def update(grok, field, input)
self.grok = grok
self.field = field
self.input = input
end

end

class NoopTimeout

def exec
yield
end

end

class RubyTimeout

def initialize(timeout_millis)
# divide by float to allow fractional seconds, the Timeout class timeout value is in seconds but the underlying
# executor resolution is in microseconds so fractional second parameter down to microseconds is possible.
# see https://github.com/jruby/jruby/blob/9.2.7.0/core/src/main/java/org/jruby/ext/timeout/Timeout.java#L125
@timeout_seconds = timeout_millis / 1000.0
end

def exec(&block)
Timeout.timeout(@timeout_seconds, TimeoutError, &block)
end

end

end # TimeoutSupport
end; end; end
47 changes: 46 additions & 1 deletion spec/filters/grok_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def pattern_path(path)
filter {
grok {
match => {
message => "(.*a){30}"
"message" => "(.*a){30}"
}
timeout_millis => 100
}
Expand All @@ -425,6 +425,51 @@ def pattern_path(path)
end
end

describe "no timeout on failure with multiple patterns (when timeout not grouped)" do
config <<-CONFIG
filter {
grok {
match => {
"message" => [
"(.*f){20}", "(.*e){20}", "(.*d){20}", "(.*c){20}", "(.*b){20}",
"(.*a){25}", "(.*a){24}", "(.*a){23}", "(.*a){22}", "(.*a){21}",
"(.*a){20}"
]
}
timeout_millis => 500
timeout_grouped => false
}
}
CONFIG

sample( 'b' * 10 + 'c' * 10 + 'd' * 10 + 'e' * 10 + ' ' + 'a' * 20 ) do
insist { subject.get("tags") }.nil?
end
end

describe "timeout on grouped (multi-pattern) failure" do
config <<-CONFIG
filter {
grok {
match => {
"message" => [
"(.*f){20}", "(.*e){20}", "(.*d){20}", "(.*c){20}", "(.*b){20}",
"(.*a){25}", "(.*a){24}", "(.*a){23}", "(.*a){22}", "(.*a){21}",
"(.*a){20}"
]
}
timeout_millis => 500
timeout_grouped => true
}
}
CONFIG

sample( 'b' * 10 + 'c' * 10 + 'd' * 10 + 'e' * 10 + ' ' + 'a' * 20 ) do
expect(subject.get("tags")).to include("_groktimeout")
expect(subject.get("tags")).not_to include("_grokparsefailure")
end
end

describe "tagging on failure" do
config <<-CONFIG
filter {
Expand Down

0 comments on commit d4aac7c

Please sign in to comment.