Skip to content

Commit

Permalink
Merge pull request #402 from splunk/DVPL-10069
Browse files Browse the repository at this point in the history
Added support to process empty records
  • Loading branch information
ashah-splunk authored Oct 27, 2021
2 parents 8a988f8 + fe1784f commit ad5f21e
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 7 deletions.
30 changes: 30 additions & 0 deletions splunklib/searchcommands/generating_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# under the License.

from __future__ import absolute_import, division, print_function, unicode_literals
import sys

from .decorators import ConfigurationSetting
from .search_command import SearchCommand
Expand Down Expand Up @@ -220,6 +221,35 @@ def _execute_chunk_v2(self, process, chunk):
return
self._finished = True

def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout, allow_empty_input=True):
""" Process data.
:param argv: Command line arguments.
:type argv: list or tuple
:param ifile: Input data file.
:type ifile: file
:param ofile: Output data file.
:type ofile: file
:param allow_empty_input: For generating commands, it must be true. Doing otherwise will cause an error.
:type allow_empty_input: bool
:return: :const:`None`
:rtype: NoneType
"""

# Generating commands are expected to run on an empty set of inputs as the first command being run in a search,
# also this class implements its own separate _execute_chunk_v2 method which does not respect allow_empty_input
# so ensure that allow_empty_input is always True

if not allow_empty_input:
raise ValueError("allow_empty_input cannot be False for Generating Commands")
else:
return super(GeneratingCommand, self).process(argv=argv, ifile=ifile, ofile=ofile, allow_empty_input=True)

# endregion

# region Types
Expand Down
23 changes: 16 additions & 7 deletions splunklib/searchcommands/search_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def __init__(self):
self._default_logging_level = self._logger.level
self._record_writer = None
self._records = None
self._allow_empty_input = True

def __str__(self):
text = ' '.join(chain((type(self).name, str(self.options)), [] if self.fieldnames is None else self.fieldnames))
Expand Down Expand Up @@ -413,7 +414,7 @@ def prepare(self):
"""
pass

def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout):
def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout, allow_empty_input=True):
""" Process data.
:param argv: Command line arguments.
Expand All @@ -425,10 +426,16 @@ def process(self, argv=sys.argv, ifile=sys.stdin, ofile=sys.stdout):
:param ofile: Output data file.
:type ofile: file
:param allow_empty_input: Allow empty input records for the command, if False an Error will be returned if empty chunk body is encountered when read
:type allow_empty_input: bool
:return: :const:`None`
:rtype: NoneType
"""

self._allow_empty_input = allow_empty_input

if len(argv) > 1:
self._process_protocol_v1(argv, ifile, ofile)
else:
Expand Down Expand Up @@ -965,13 +972,14 @@ def _execute_v2(self, ifile, process):
def _execute_chunk_v2(self, process, chunk):
metadata, body = chunk

if len(body) <= 0:
return
if len(body) <= 0 and not self._allow_empty_input:
raise ValueError(
"No records found to process. Set allow_empty_input=True in dispatch function to move forward "
"with empty records.")

records = self._read_csv_records(StringIO(body))
self._record_writer.write_records(process(records))


def _report_unexpected_error(self):

error_type, error, tb = sys.exc_info()
Expand Down Expand Up @@ -1063,8 +1071,7 @@ def iteritems(self):
SearchMetric = namedtuple('SearchMetric', ('elapsed_seconds', 'invocation_count', 'input_count', 'output_count'))



def dispatch(command_class, argv=sys.argv, input_file=sys.stdin, output_file=sys.stdout, module_name=None):
def dispatch(command_class, argv=sys.argv, input_file=sys.stdin, output_file=sys.stdout, module_name=None, allow_empty_input=True):
""" Instantiates and executes a search command class
This function implements a `conditional script stanza <https://docs.python.org/2/library/__main__.html>`_ based on the value of
Expand All @@ -1087,6 +1094,8 @@ def dispatch(command_class, argv=sys.argv, input_file=sys.stdin, output_file=sys
:type output_file: :code:`file`
:param module_name: Name of the module calling :code:`dispatch` or :const:`None`.
:type module_name: :code:`basestring`
:param allow_empty_input: Allow empty input records for the command, if False an Error will be returned if empty chunk body is encountered when read
:type allow_empty_input: bool
:returns: :const:`None`
**Example**
Expand Down Expand Up @@ -1124,4 +1133,4 @@ def stream(records):
assert issubclass(command_class, SearchCommand)

if module_name is None or module_name == '__main__':
command_class().process(argv, input_file, output_file)
command_class().process(argv, input_file, output_file, allow_empty_input)
17 changes: 17 additions & 0 deletions tests/searchcommands/test_generator_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,21 @@ def generate(self):
assert finished_seen


def test_allow_empty_input_for_generating_command():
"""
Passing allow_empty_input for generating command will cause an error
"""
@Configuration()
class GeneratorTest(GeneratingCommand):
def generate(self):
for num in range(1, 3):
yield {"_index": num}
generator = GeneratorTest()
in_stream = io.BytesIO()
out_stream = io.BytesIO()

try:
generator.process([], in_stream, out_stream, allow_empty_input=False)
except ValueError as error:
assert str(error) == "allow_empty_input cannot be False for Generating Commands"

56 changes: 56 additions & 0 deletions tests/searchcommands/test_search_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,62 @@ def test_process_scpv2(self):
r'\{(' + inspector + r',' + finished + r'|' + finished + r',' + inspector + r')\}')

self.assertEqual(command.protocol_version, 2)

# 5. Different scenarios with allow_empty_input flag, default is True
# Test preparation
dispatch_dir = os.path.join(basedir, 'recordings', 'scpv2', 'Splunk-6.3', 'countmatches.dispatch_dir')
logging_configuration = os.path.join(basedir, 'apps', 'app_with_logging_configuration', 'logging.conf')
logging_level = 'ERROR'
record = False
show_configuration = True

getinfo_metadata = metadata.format(
dispatch_dir=encode_string(dispatch_dir),
logging_configuration=encode_string(logging_configuration)[1:-1],
logging_level=logging_level,
record=('true' if record is True else 'false'),
show_configuration=('true' if show_configuration is True else 'false'))

execute_metadata = '{"action":"execute","finished":true}'
command = TestCommand()
result = BytesIO()
argv = ['some-external-search-command.py']

# Scenario a) Empty body & allow_empty_input=False ==> Assert Error

execute_body = '' # Empty body
input_file = build_command_input(getinfo_metadata, execute_metadata, execute_body)
try:
command.process(argv, input_file, ofile=result, allow_empty_input=False) # allow_empty_input=False
except SystemExit as error:
self.assertNotEqual(0, error.code)
self.assertTrue(result.getvalue().decode("UTF-8").__contains__("No records found to process. Set "
"allow_empty_input=True in dispatch "
"function to move forward with empty "
"records."))
else:
self.fail('Expected SystemExit, not a return from TestCommand.process: {}\n'.format(
result.getvalue().decode('utf-8')))

# Scenario b) Empty body & allow_empty_input=True ==> Assert Success

execute_body = '' # Empty body
input_file = build_command_input(getinfo_metadata, execute_metadata, execute_body)
result = BytesIO()

try:
command.process(argv, input_file, ofile=result) # By default allow_empty_input=True
except SystemExit as error:
self.fail('Unexpected exception: {}: {}'.format(type(error).__name__, error))

expected = (
'chunked 1.0,68,0\n'
'{"inspector":{"messages":[["INFO","test command configuration: "]]}}\n'
'chunked 1.0,17,0\n'
'{"finished":true}'
)

self.assertEquals(result.getvalue().decode("UTF-8"), expected)
return

_package_directory = os.path.dirname(os.path.abspath(__file__))
Expand Down

0 comments on commit ad5f21e

Please sign in to comment.