Skip to content

Commit

Permalink
Merge pull request #1330 from jmacdone/bugfix/compound-qk
Browse files Browse the repository at this point in the history
Bugfix compound query_key and mixed mappings
  • Loading branch information
jertel authored Dec 13, 2023
2 parents 64cde55 + e78a4b7 commit f94739f
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
- [Docs] Extend FAQ / troubleshooting section with information on Elasticsearch RBAC - [#1324](https://github.com/jertel/elastalert2/pull/1324) - @chr-b
- Upgrade to Python 3.12 - [#1327](https://github.com/jertel/elastalert2/pull/1327) - @jertel
- Correction in IRIS and GELF alerter [#1331](https://github.com/jertel/elastalert2/pull/1331) - @malinkinsa
- Fix handing of compound_query_key values - [#1330](https://github.com/jertel/elastalert2/pull/1330) - @jmacdone
- Fix handing raw_query_key and query_key values ending with .keyword- [#1330](https://github.com/jertel/elastalert2/pull/1330) - @jmacdone
- [Docs] Fix broken search function caused by sphinx upgrade a few releases ago - [#1332](https://github.com/jertel/elastalert2/pull/1332) - @jertel
- [Docs] Fix mismatch for parameter iris_customer_id - [1334](https://github.com/jertel/elastalert2/pull/1334) @malinkinsa
- [IRIS] Make parameter iris_customer_id optional with default value - [1334](https://github.com/jertel/elastalert2/pull/1334) @malinkinsa
Expand Down
52 changes: 35 additions & 17 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import os
import random
import re
import signal
import sys
import threading
Expand Down Expand Up @@ -463,25 +464,42 @@ def get_hits_count(self, rule, starttime, endtime, index):
)
return {endtime: res['count']}

@staticmethod
def query_key_filters(rule: dict, qk_value_csv: str) -> dict:
if qk_value_csv is None:
return

# Split on comma followed by zero or more whitespace characters. It's
# expected to be commaspace separated. However 76ab593 suggests there
# are cases when it is only comma and not commaspace
qk_values = re.split(r',\s*',qk_value_csv)
end = '.keyword'

query_keys = []
try:
query_keys = rule['compound_query_key']
except KeyError:
query_key = rule.get('query_key')
if query_key is not None:
query_keys.append(query_key)

if len(qk_values) != len(query_keys):
msg = ( f"Received {len(qk_values)} value(s) for {len(query_keys)} key(s)."
f" Did '{qk_value_csv}' have a value with a comma?"
" See https://github.com/jertel/elastalert2/pull/1330#issuecomment-1849962106" )
elastalert_logger.warning( msg )

for key, value in zip(query_keys, qk_values):
if rule.get('raw_count_keys', True):
if not key.endswith(end):
key += end
yield {'term': {key: value}}

def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=None):
rule_filter = copy.copy(rule['filter'])
if qk:
qk_list = qk.split(",")
end = '.keyword'

if len(qk_list) == 1:
qk = qk_list[0]
filter_key = rule['query_key']
if rule.get('raw_count_keys', True) and not rule['query_key'].endswith(end):
filter_key = add_keyword_postfix(filter_key)
rule_filter.extend([{'term': {filter_key: qk}}])
else:
filter_keys = rule['compound_query_key']
for i in range(len(filter_keys)):
key_with_postfix = filter_keys[i]
if rule.get('raw_count_keys', True) and not key.endswith(end):
key_with_postfix = add_keyword_postfix(key_with_postfix)
rule_filter.extend([{'term': {key_with_postfix: qk_list[i]}}])

for filter in self.query_key_filters(rule=rule, qk_value_csv=qk):
rule_filter.append(filter)

base_query = self.get_query(
rule_filter,
Expand Down
25 changes: 24 additions & 1 deletion elastalert/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def new_get_event_ts(ts_field):
return lambda event: lookup_es_key(event[0], ts_field)


def _find_es_dict_by_key(lookup_dict, term):
def _find_es_dict_by_key(lookup_dict: dict, term: str, string_multi_field_name: str = ".keyword") -> tuple[dict, str]:
""" Performs iterative dictionary search based upon the following conditions:
1. Subkeys may either appear behind a full stop (.) or at one lookup_dict level lower in the tree.
Expand All @@ -64,8 +64,31 @@ def _find_es_dict_by_key(lookup_dict, term):
element which is the last subkey used to access the target specified by the term. None is
returned for both if the key can not be found.
"""

# For compound fieldnames added by ElastAlert.process_hits()
#
# For example, when query_key is a list of fieldnames it will insert a term
# 'key_1,other_fieldname,a_third_name'
# and if the rule is set for raw_query_keys, the query_key values may end
# with .keyword it will insert instead something like
# 'key_1_ip,other_fieldname_number,a_third_name.keyword'
# and we need to check for that synthentic compound fielname, including the
# .keyword suffix before contnuing
#
# Of course, it also handles happy path, non-ambuiguous fieldnames like
# 'ip_address' and 'src_displayname' that don't have . or [] characters
if term in lookup_dict:
return lookup_dict, term

# If not synthetically added by ElastAlert, matching documents will not have
# .keyword fieldnames, even if a .keyword fieldname was used as a term in
# the search
# e.g. {"term": {"description.keyword": "Target Description Here"}}
# will return a document with {"_source": {"description": "Target Description Here"}}
term = term.removesuffix(string_multi_field_name)
if term in lookup_dict:
return lookup_dict, term

# If the term does not match immediately, perform iterative lookup:
# 1. Split the search term into tokens
# 2. Recurrently concatenate these together to traverse deeper into the dictionary,
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
'elastalert=elastalert.elastalert:main']},
packages=find_packages(exclude=["tests"]),
package_data={'elastalert': ['schema.yaml', 'es_mappings/**/*.json']},
python_requires='>=3.9',
install_requires=[
'apscheduler>=3.10.4,<4.0',
'aws-requests-auth>=0.4.3',
Expand Down
189 changes: 189 additions & 0 deletions tests/hits_terms_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import pytest
from datetime import datetime, timedelta

from elastalert.util import dt_to_ts
from elastalert.elastalert import ElastAlerter

# I like the dictionary whitespace the way it is, thank you
# but I'm not going to tag all the lines with #noqa: E201
# flake8: noqa

@pytest.fixture
def example_agg_response():
res = {
'took': 1,
'timed_out': False,
'_shards': {'total': 3, 'successful': 3, 'skipped': 0, 'failed': 0},
'hits': {
'total': {'value': 9, 'relation': 'eq'},
'max_score': None,
'hits': []},
'aggregations': {
'counts': {
'doc_count_error_upper_bound': 0,
'sum_other_doc_count': 0,
'buckets': [{'key': '10.0.4.174', 'doc_count': 2},
{'key': '10.0.4.241', 'doc_count': 2},
{'key': '10.0.4.76', 'doc_count': 1},
{'key': '10.0.4.123', 'doc_count': 1},
{'key': '10.0.4.156', 'doc_count': 1},
{'key': '10.0.4.231', 'doc_count': 1},
{'key': '10.0.4.248', 'doc_count': 1}]}}
}
return res


def _mock_query_key_option_loader(rule):
'''
So, some copypasta from loaders.load_options,
if query_key is a string:
no compound_query_key is created
if query_key is a list:
if len() > 1:
compound_query_key is created
query_key is replaced with ",".join() of the original query_key values
if len() == 1:
the query_key list with one string is normalilzed back to just a string
if len() == 0:
somehow it was an empty list and query_keys is silently dropped from the config
'''
raw_query_key = rule.get('query_key')
if isinstance(raw_query_key, list):
if len(raw_query_key) > 1:
rule['compound_query_key'] = raw_query_key
rule['query_key'] = ','.join(raw_query_key)
elif len(raw_query_key) == 1:
rule['query_key'] = raw_query_key[0]
else:
del rule['query_key']


@pytest.mark.parametrize(
["qk_value", "query_key"],
# scenario A: 3 query keys
[ ( ['172.16.1.10', '/api/v1/endpoint-foo', 'us-east-2'],
['server_ip', 'service_name', 'region'] ),
# scenario B: 2 query keys
( ['172.16.1.10', '/api/v1/endpoint-foo'],
['server_ip', 'service_name'] ),
# scenario C: 1 query key, but it was given as a list of one fieldname in the rule options
# as of this writing, 707b2a5 shouldn't allow this to happen, but here is a test regardless
( ['172.16.1.10'],
['server_ip'] ),
# scenario D: 1 query key, given as a string
( ['172.16.1.10'],
'server_ip' ),
# scenario E: no query key
( None,
None )
],
)
@pytest.mark.parametrize("query_key_values_separator", [",", ", ", ", ", ",\t"])
def test_get_hits_terms_with_factored_out_filters(ea, example_agg_response, qk_value, query_key, query_key_values_separator):

if query_key is not None:
ea.rules[0]['query_key'] = query_key

# emulate the rule['compound_query_key'] creation logic which prob should be
# factored out of loaders.load_options() instead of copypasta'd for the test
_mock_query_key_option_loader(ea.rules[0])

try:
# ElastAlert.process_hits() is expected to insert the filedname values
# from _hits as a commaspace csv
qk_csv = query_key_values_separator.join(qk_value)
except TypeError:
qk_csv = None
index = 'foo-2023-13-13' #lousy Smarch weather
top_term_key = 'client_ip'

endtime = datetime.now()
starttime = endtime - timedelta(hours=1)
ea.thread_data.current_es.search.return_value = example_agg_response

hit_terms = ea.get_hits_terms(
rule=ea.rules[0],
starttime=starttime,
endtime=endtime,
index=index,
key=top_term_key,
qk = qk_csv,
size=None
)
assert endtime in hit_terms
assert hit_terms[endtime] == example_agg_response['aggregations']['counts']['buckets']

expected_filters = [
{'range': {'@timestamp': { 'gt': dt_to_ts(starttime), 'lte': dt_to_ts(endtime)}}}
]
try:
cqk = ea.rules[0]['compound_query_key']
for fieldname, value in zip(cqk, qk_value):
filter = {'term': {f'{fieldname}.keyword': value}}
expected_filters.append(filter)
except KeyError:
#not a compound, eh? it must be a string of a single filedname
try:
fieldname = ea.rules[0]['query_key']
filter = {'term': {f'{fieldname}.keyword': qk_value[0]}}
expected_filters.append(filter)
except KeyError:
pass # maybe the rule never had a query_key, or it was an empty list and purged

expected_query = {
'query': {'bool': {'filter': {'bool': {'must': expected_filters}}}},
# 50 harded coded in get_hits_terms as a default for size=None
'aggs': {'counts': {'terms': {'field': top_term_key, 'size': 50, 'min_doc_count': 1}}}
}
ea.thread_data.current_es.search.assert_called_with(index=index,body=expected_query, size=0, ignore_unavailable=True)


def test_query_key_filters_single_query_key():
rule = { 'query_key': 'a_single_key_as_a_string' }
qk_value_csv = 'a single value'
filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv))
expected_filters = [{'term': {f'{rule['query_key']}.keyword': qk_value_csv}}]
assert filters == expected_filters

@pytest.mark.parametrize("query_key_values_separator", [",", ", ", ", ", ",\t"])
def test_query_key_filters_compound_query_key(query_key_values_separator):
rule = { 'query_key': 'compound,key',
'compound_query_key': ['compound', 'key'] }
qk_value_csv = query_key_values_separator.join( ['combined value', 'by commaspace'] )
filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv))
expected_filters = [
{'term': {'compound.keyword': 'combined value'}},
{'term': {'key.keyword': 'by commaspace'}},
]
assert filters == expected_filters

def test_query_key_filters_brittle_query_key_value_logs_warning(caplog):
rule = { 'query_key': 'university,state',
'compound_query_key': ['university', 'state'] }
#uh oh, a commaspace we didn't expect
qk_value_csv = 'California State University, San Bernardino, California'
filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv))
log = caplog.records[0]
assert log.levelname == "WARNING"
assert 'Received 3 value(s) for 2 key(s).' in log.message

def test_query_key_filters_none_values():
rule = { 'query_key': 'something'}
qk_value_csv = None
filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv))
assert len(filters) == 0

def test_query_key_filters_unexpected_passed_values_for_a_rule_without_query_keys(caplog):
rule = { }
qk_value_csv = 'value'
filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv))
assert len(filters) == 0
log = caplog.records[0]
assert log.levelname == "WARNING"
assert 'Received 1 value(s) for 0 key(s).' in log.message
6 changes: 6 additions & 0 deletions tests/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def test_looking_up_nested_keys(ea):
}

assert lookup_es_key(record, 'Fields.ts') == expected
assert lookup_es_key(record, 'Fields.ts.keyword') == expected


def test_looking_up_nested_composite_keys(ea):
Expand All @@ -131,6 +132,7 @@ def test_looking_up_nested_composite_keys(ea):
}

assert lookup_es_key(record, 'Fields.ts.value') == expected
assert lookup_es_key(record, 'Fields.ts.value.keyword') == expected


def test_looking_up_arrays(ea):
Expand All @@ -148,10 +150,14 @@ def test_looking_up_arrays(ea):
assert lookup_es_key(record, 'flags[0]') == 1
assert lookup_es_key(record, 'flags[1]') == 2
assert lookup_es_key(record, 'objects[0]foo') == 'bar'
assert lookup_es_key(record, 'objects[0]foo.keyword') == 'bar'
assert lookup_es_key(record, 'objects[1]foo[0]bar') == 'baz'
assert lookup_es_key(record, 'objects[2]foo.bar') == 'baz'
assert lookup_es_key(record, 'objects[2]foo.bar.keyword') == 'baz'
assert lookup_es_key(record, 'objects[1]foo[1]bar') is None
assert lookup_es_key(record, 'objects[1]foo[1]bar.keyword') is None
assert lookup_es_key(record, 'objects[1]foo[0]baz') is None
assert lookup_es_key(record, 'objects[1]foo[0]baz.keyword') is None
assert lookup_es_key(record, 'nested.foo[0]') == 'bar'
assert lookup_es_key(record, 'nested.foo[1]') == 'baz'

Expand Down

0 comments on commit f94739f

Please sign in to comment.