Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added: support for timeout_scope #153

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 4.2.0
- Added: support for timeout_scope [#153](https://github.com/logstash-plugins/logstash-filter-grok/pull/153)

## 4.1.1
- Fix formatting for code sample [#148](https://github.com/logstash-plugins/logstash-filter-grok/pull/148)

Expand Down
17 changes: 17 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-tag_on_failure>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-tag_on_timeout>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-timeout_millis>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-timeout_scope>> |<<string,string>>|No
|=======================================================================

Also see <<plugins-{type}s-{plugin}-common-options>> for a list of options supported by all
Expand Down Expand Up @@ -356,6 +357,22 @@ This will never timeout early, but may take a little longer to timeout.
Actual timeout is approximate based on a 250ms quantization.
Set to 0 to disable timeouts

[id="plugins-{type}s-{plugin}-timeout_scope"]
===== `timeout_scope`

* Value type is <<string,string>>
* Default value is `"pattern"`
* Supported values are `"pattern"` and `"event"`

When multiple patterns are provided to <<plugins-{type}s-{plugin}-match>>,
the timeout has historically applied to _each_ pattern, incurring overhead
for each and every pattern that is attempted; when the grok filter is
configured with `timeout_scope => event`, the plugin instead enforces
a single timeout across all attempted matches on the event, so it can
achieve similar safeguard against runaway matchers with significantly
less overhead.

It's usually better to scope the timeout for the whole event.


[id="plugins-{type}s-{plugin}-common-options"]
Expand Down
130 changes: 109 additions & 21 deletions lib/logstash/filters/grok.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
# `SYSLOGBASE` pattern which itself is defined by other patterns.
#
# Another option is to define patterns _inline_ in the filter using `pattern_definitions`.
# This is mostly for convenience and allows user to define a pattern which can be used just in that
# This is mostly for convenience and allows user to define a pattern which can be used just in that
# filter. This newly defined patterns in `pattern_definitions` will not be available outside of that particular `grok` filter.
#
class LogStash::Filters::Grok < LogStash::Filters::Base
Expand Down Expand Up @@ -168,7 +168,7 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
# necessarily need to define this yourself unless you are adding additional
# patterns. You can point to multiple pattern directories using this setting.
# Note that Grok will read all files in the directory matching the patterns_files_glob
# and assume it's a pattern file (including any tilde backup files).
# and assume it's a pattern file (including any tilde backup files).
# [source,ruby]
# patterns_dir => ["/opt/logstash/patterns", "/opt/logstash/extra_patterns"]
#
Expand Down Expand Up @@ -215,6 +215,16 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
# Set to 0 to disable timeouts
config :timeout_millis, :validate => :number, :default => 30000

# When multiple patterns are provided to `match`,
# the timeout has historically applied to _each_ pattern, incurring overhead
# for each and every pattern that is attempted; when the grok filter is
# configured with `timeout_scope => 'event'`, the plugin instead enforces
# a single timeout across all attempted matches on the event, so it can
# achieve similar safeguard against runaway matchers with significantly
# less overhead.
# It's usually better to scope the timeout for the whole event.
config :timeout_scope, :validate => %w(pattern event), :default => "pattern"

# Tag to apply if a grok regexp times out.
config :tag_on_timeout, :validate => :string, :default => '_groktimeout'

Expand Down Expand Up @@ -278,10 +288,8 @@ def register
@match_counter = metric.counter(:matches)
@failure_counter = metric.counter(:failures)

# divide by float to allow fractionnal seconds, the Timeout class timeout value is in seconds but the underlying
# executor resolution is in microseconds so fractionnal second parameter down to microseconds is possible.
# see https://github.com/jruby/jruby/blob/9.2.7.0/core/src/main/java/org/jruby/ext/timeout/Timeout.java#L125
@timeout_seconds = @timeout_millis / 1000.0
@timeout = @timeout_millis > 0.0 ? RubyTimeout.new(@timeout_millis) : NoopTimeout::INSTANCE
@matcher = ( @timeout_scope.eql?('event') ? EventTimeoutMatcher : PatternTimeoutMatcher ).new(self)
end # def register

def filter(event)
Expand Down Expand Up @@ -334,25 +342,56 @@ def match(groks, field, event)
end

def match_against_groks(groks, field, input, event)
input = input.to_s
matched = false
groks.each do |grok|
# Convert anything else to string (number, hash, etc)
matched = grok_till_timeout(grok, field, input)
if matched
grok.capture(matched) {|field, value| handle(field, value, event)}
break if @break_on_match
# Convert anything else to string (number, hash, etc)
context = GrokContext.new(field, input.to_s)
@matcher.match(context, groks, event, @break_on_match)
end

# Internal (base) helper to handle the global timeout switch.
# @private
class Matcher

def initialize(filter)
@filter = filter
end

def match(context, groks, event, break_on_match)
matched = false

groks.each do |grok|
context.set_grok(grok)

matched = execute(context, grok)
if matched
grok.capture(matched) { |field, value| @filter.handle(field, value, event) }
break if break_on_match
end
end

matched
end

protected

def execute(context, grok)
grok.execute(context.input)
end

matched

end

def grok_till_timeout(grok, field, value)
begin
@timeout_seconds > 0.0 ? Timeout.timeout(@timeout_seconds, TimeoutError) { grok.execute(value) } : grok.execute(value)
rescue TimeoutError
raise GrokTimeoutException.new(grok, field, value)
# @private
class EventTimeoutMatcher < Matcher
kares marked this conversation as resolved.
Show resolved Hide resolved
# @override
def match(context, groks, event, break_on_match)
@filter.with_timeout(context) { super }
end
end

# @private
class PatternTimeoutMatcher < Matcher
# @override
def execute(context, grok)
@filter.with_timeout(context) { super }
end
end

Expand All @@ -378,6 +417,7 @@ def handle(field, value, event)
end
end
end
public :handle

def patterns_files_from_paths(paths, glob)
patternfiles = []
Expand Down Expand Up @@ -438,4 +478,52 @@ def trunc_value
end
end
end

def with_timeout(context, &block)
@timeout.exec(&block)
rescue TimeoutError => error
handle_timeout(context, error)
end
public :with_timeout

def handle_timeout(context, error)
raise GrokTimeoutException.new(context.grok, context.field, context.input)
end

# @private
class GrokContext
attr_reader :grok, :field, :input

def initialize(field, input)
@field = field
@input = input
end

def set_grok(grok)
@grok = grok
end
end

# @private
class NoopTimeout
INSTANCE = new

def exec
yield
end
end

# @private
class RubyTimeout
def initialize(timeout_millis)
# divide by float to allow fractional seconds, the Timeout class timeout value is in seconds but the underlying
# executor resolution is in microseconds so fractional second parameter down to microseconds is possible.
# see https://github.com/jruby/jruby/blob/9.2.7.0/core/src/main/java/org/jruby/ext/timeout/Timeout.java#L125
@timeout_seconds = timeout_millis / 1000.0
end

def exec(&block)
Timeout.timeout(@timeout_seconds, TimeoutError, &block)
end
end
end # class LogStash::Filters::Grok
2 changes: 1 addition & 1 deletion logstash-filter-grok.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-grok'
s.version = '4.1.1'
s.version = '4.2.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Parses unstructured event data into fields"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
49 changes: 47 additions & 2 deletions spec/filters/grok_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def pattern_path(path)

sample "400 454.33" do
insist { subject.get("foo") } == 400
insist { subject.get("foo") }.is_a?(Fixnum)
insist { subject.get("foo") }.is_a?(Integer)
insist { subject.get("bar") } == 454.33
insist { subject.get("bar") }.is_a?(Float)
end
Expand Down Expand Up @@ -412,7 +412,7 @@ def pattern_path(path)
filter {
grok {
match => {
message => "(.*a){30}"
"message" => "(.*a){30}"
}
timeout_millis => 100
}
Expand All @@ -425,6 +425,51 @@ def pattern_path(path)
end
end

describe "no timeout on failure with multiple patterns (when timeout not grouped)" do
config <<-CONFIG
filter {
grok {
match => {
"message" => [
"(.*f){20}", "(.*e){20}", "(.*d){20}", "(.*c){20}", "(.*b){20}",
"(.*a){25}", "(.*a){24}", "(.*a){23}", "(.*a){22}", "(.*a){21}",
"(.*a){20}"
]
}
timeout_millis => 500
timeout_scope => 'pattern'
}
}
CONFIG

sample( 'b' * 10 + 'c' * 10 + 'd' * 10 + 'e' * 10 + ' ' + 'a' * 20 ) do
insist { subject.get("tags") }.nil?
end
end

describe "timeout on grouped (multi-pattern) failure" do
config <<-CONFIG
filter {
grok {
match => {
"message" => [
"(.*f){20}", "(.*e){20}", "(.*d){20}", "(.*c){20}", "(.*b){20}",
"(.*a){25}", "(.*a){24}", "(.*a){23}", "(.*a){22}", "(.*a){21}",
"(.*a){20}"
]
}
timeout_millis => 500
timeout_scope => 'event'
}
}
CONFIG

sample( 'b' * 10 + 'c' * 10 + 'd' * 10 + 'e' * 10 + ' ' + 'a' * 20 ) do
expect(subject.get("tags")).to include("_groktimeout")
expect(subject.get("tags")).not_to include("_grokparsefailure")
end
end

describe "tagging on failure" do
config <<-CONFIG
filter {
Expand Down