Skip to content

Commit

Permalink
Merge pull request #456 from target/ScanXML_Refactor+AdditionalFuncti…
Browse files Browse the repository at this point in the history
…onality

Porting refactor of ScanXML
  • Loading branch information
phutelmyer authored Apr 23, 2024
2 parents c621fe8 + 39a16c0 commit a88ede1
Show file tree
Hide file tree
Showing 6 changed files with 409 additions and 99 deletions.
16 changes: 15 additions & 1 deletion configs/python/backend/backend.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version: 2024.04.02.01

version: 2024.04.22.01
logging_cfg: '/etc/strelka/logging.yaml'
limits:
max_files: 5000
Expand Down Expand Up @@ -686,6 +687,19 @@ scanners:
- 'mso_file'
- 'soap_file'
priority: 5
options:
extract_tags:
- "target"
- "script"
- "embeddedfile"
- "cipherdata"
- "data"
- "signedinfo"
- "encrypteddata"
metadata_tags:
- "type"
- "description"
- "maintainer"
'ScanYara':
- positive:
flavors:
Expand Down
73 changes: 73 additions & 0 deletions src/python/strelka/auxiliary/iocs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import re
from urllib.parse import urlparse

import tldextract


def extract_iocs_from_string(input_string):
"""
Extracts various types of Indicators of Compromise (IOCs) from a string.
This function looks for domain names and IP addresses within the given string.
Args:
input_string (str): The input string to search for IOCs.
Returns:
list: A list with iocs of unique extracted values.
"""
iocs = set()
iocs.update(extract_domains_from_string(input_string))
iocs.update(extract_ip_addresses(input_string))
return list(iocs)


def extract_domains_from_string(input_string):
"""
Extracts domain names from a string containing URLs.
Args:
input_string (str): The input string to search for URLs.
Returns:
set: A set of unique domain names extracted from the URLs.
"""
domains = set()

# Use a regular expression to find URLs in the data string
urls = re.findall(r"(?:https?|ftp|ftps|file|smb)://[^\s/$.?#].[^\s]*", input_string)

for url in urls:
# Use urlparse to check if the string is a valid URL
parsed_url = urlparse(url)
if parsed_url.scheme and parsed_url.netloc:
# Use tldextract to extract the domain from the URL
extracted = tldextract.extract(url)
domain = (
f"{extracted.subdomain}.{extracted.domain}.{extracted.suffix}".strip(
"."
)
)
domains.add(domain)

return list(domains)


def extract_ip_addresses(input_string):
"""
Extracts IP addresses from a string.
Args:
input_string (str): The input string to search for IP addresses.
Returns:
list: A list of unique IP addresses extracted from the input string.
"""
ip_addresses = set()

# Regular expressions for matching IPv4 and IPv6 addresses
ipv4_pattern = r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b"
ipv6_pattern = r"\b(?:[A-F0-9]{1,4}:){7}[A-F0-9]{1,4}\b"

# Find all matching IP addresses
ipv4_addresses = re.findall(ipv4_pattern, input_string, re.IGNORECASE)
ipv6_addresses = re.findall(ipv6_pattern, input_string, re.IGNORECASE)

# Add found IP addresses to the set
ip_addresses.update(ipv4_addresses)
ip_addresses.update(ipv6_addresses)

return list(ip_addresses)
178 changes: 120 additions & 58 deletions src/python/strelka/scanners/scan_xml.py
Original file line number Diff line number Diff line change
@@ -1,86 +1,148 @@
from typing import Any, Dict, Set

from lxml import etree

from strelka import strelka
from strelka.auxiliary.iocs import extract_iocs_from_string


class ScanXml(strelka.Scanner):
"""Collects metadata and extracts embedded files from XML files.
"""
Collects metadata and extracts embedded files from XML files.
This scanner parses XML files to collect metadata and extract embedded files based on specified tags.
It is used in forensic and malware analysis to extract and analyze structured data within XML documents.
Scanner Type: Collection
Attributes:
None
Options:
extract_tags: List of XML tags that will have their text extracted
as child files.
Defaults to empty list.
metadata_tags: List of XML tags that will have their text logged
as metadata.
Defaults to empty list.
extract_tags (list[str]): Tags whose content is extracted as child files.
metadata_tags (list[str]): Tags whose content is logged as metadata.
## Detection Use Cases
!!! info "Detection Use Cases"
- **Embedded File Extraction**
- Extracts files embedded within specific XML tags.
- **Metadata Extraction**:
- Collects metadata from specific XML tags.
## Known Limitations
!!! warning "Known Limitations"
- Complex or malformed XML structures might lead to incomplete parsing or errors.
- Excessive files may be scanned / collected if XML mimetypes are set in the `backend.yml`
## To Do
!!! question "To Do"
- Improve error handling for malformed XML structures.
- Better extraction of tags / metadata tags
## References
!!! quote "References"
- XML File Format Specification (https://www.w3.org/XML/)
## Contributors
!!! example "Contributors"
- [Josh Liburdi](https://github.com/jshlbrd)
- [Paul Hutelmyer](https://github.com/phutelmyer)
"""

def scan(self, data, file, options, expire_at):
xml_args = {
"extract_tags": options.get("extract_tags", []),
"metadata_tags": options.get("metadata_tags", []),
def scan(
self, data: bytes, file: strelka.File, options: dict, expire_at: int
) -> None:
"""
Parses XML data to extract metadata and files.
Args:
data: XML data as bytes.
file: File object containing metadata about the scan.
options: Dictionary of scanner options.
expire_at: Time when the scan should be considered expired.
Scans the XML file, extracting data and metadata based on the specified tags,
and emits files as necessary.
"""
# Prepare options with case-insensitive tag matching
xml_options = {
"extract_tags": [tag.lower() for tag in options.get("extract_tags", [])],
"metadata_tags": [tag.lower() for tag in options.get("metadata_tags", [])],
}
self.expire_at = expire_at
self.event.setdefault("tags", [])

# Initialize scan event data
self.event.setdefault("tags", set())
self.event.setdefault("tag_data", [])
self.event.setdefault("namespaces", [])
self.event.setdefault("namespaces", set())
self.event["total"] = {"tags": 0, "extracted": 0}
self.emitted_files: Set[str] = (
set()
) # Tracks emitted files to prevent duplicates

xml = None
# Parse the XML content
try:
xml_buffer = data
if xml_buffer.startswith(b"<?XML"):
xml_buffer = b"<?xml" + xml_buffer[5:]
xml = etree.fromstring(xml_buffer)
docinfo = xml.getroottree().docinfo
if docinfo.doctype:
self.event["doc_type"] = docinfo.doctype
if docinfo.xml_version:
self.event["version"] = docinfo.xml_version
self.event["doc_type"] = docinfo.doctype if docinfo.doctype else ""
self.event["version"] = docinfo.xml_version if docinfo.xml_version else ""

except etree.XMLSyntaxError:
self.flags.append("syntax_error")
# Recursively process each node in the XML
self._recurse_node(xml, xml_options)

if xml is not None:
self._recurse_node(self, xml, xml_args)
except etree.XMLSyntaxError as e:
self.flags.append(f"syntax_error: {str(e)}")

@staticmethod
def _recurse_node(self, node, xml_args):
"""Recursively parses XML file.
# Finalize the event data for reporting
self.event["tags"] = list(self.event["tags"])
self.event["tag_data"] = list(self.event["tag_data"])
self.event["total"]["tags"] = len(self.event["tags"])
self.event["namespaces"] = list(self.event["namespaces"])
self.event["emitted_content"] = list(self.emitted_files)

The XML file is recursively parsed down every node tree.
# Extract and add Indicators of Compromise (IOCs)
self.add_iocs(extract_iocs_from_string(data.decode("utf-8")))

def _recurse_node(self, node: etree._Element, xml_options: Dict[str, Any]) -> None:
"""
Recursively processes each XML node to extract data and metadata.
Args:
node: node to be recursively parsed.
xml_args: options set by the scanner that affect XMl parsing.
node: The current XML node to process.
xml_options: Options for data extraction and metadata logging.
Iterates through XML nodes, extracting data and collecting metadata as specified
by the scanner options.
"""
if node is not None:
if hasattr(node.tag, "__getitem__"):
if node.tag.startswith("{"):
namespace, separator, tag = node.tag[1:].partition("}")
else:
namespace = None
tag = node.tag

self.event["total"]["tags"] += 1
if namespace not in self.event["namespaces"]:
self.event["namespaces"].append(namespace)
if tag not in self.event["tags"]:
self.event["tags"].append(tag)

text = node.attrib.get("name", node.text)
if text is not None:
if tag in xml_args["metadata_tags"]:
tag_data = {"tag": tag, "text": text.strip()}
if tag_data not in self.event["tag_data"]:
self.event["tag_data"].append(tag_data)
elif tag in xml_args["extract_tags"]:
# Send extracted file back to Strelka
self.emit_file(text, name=tag)

self.event["total"]["extracted"] += 1

if node is not None and hasattr(node.tag, "__getitem__"):
namespace, _, tag = node.tag.partition("}")
namespace = namespace[1:] if namespace.startswith("{") else ""
tag = tag.lower()

if tag:
self.event["tags"].add(tag)
if namespace:
self.event["namespaces"].add(namespace)

# Handle specific content extraction and emission
if tag in xml_options["extract_tags"]:
content = node.text.strip() if node.text else ""
if content:
self.emit_file(content, name=tag)
self.emitted_files.add(content)
self.event["total"]["extracted"] += 1

# Always process attributes to capture any relevant metadata or data for emission
self._process_attributes(node, xml_options, tag)

# Continue to recurse through child nodes to extract data
for child in node.getchildren():
self._recurse_node(self, child, xml_args)
self._recurse_node(child, xml_options)

return
def _process_attributes(
self, node: etree._Element, xml_options: Dict[str, Any], tag: str
) -> None:
"""
Processes XML node attributes to extract or log data.
Args:
node: XML node whose attributes are being processed.
xml_options: Configuration options for the scan.
tag: The tag of the current XML node being processed.
Extracts data from attributes specified in the extract_tags list and logs data
from attributes specified in the metadata_tags list.
"""
for attr_name, attr_value in node.attrib.items():
attr_name_lower = attr_name.lower()
if attr_name_lower in xml_options["metadata_tags"]:
self.event["tag_data"].append(
{"tag": attr_name, "content": str(node.attrib)}
)
50 changes: 27 additions & 23 deletions src/python/strelka/tests/fixtures/test.xml
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
<?xml version="1.0"?>
<package format="2">
<name>flea3</name>
<version>0.1.0</version>
<description>The flea3 package</description>

<maintainer email="quchao@seas.upenn.edu">Chao Qu</maintainer>

<license>WTFPL</license>
<buildtool_depend>catkin</buildtool_depend>

<depend>roscpp</depend>
<depend>nodelet</depend>
<depend>camera_base</depend>
<!--<depend>std_msgs</depend>-->
<depend>dynamic_reconfigure</depend>
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>

<export>
<nodelet plugin="${prefix}/nodelet_plugins.xml"/>
</export>
</package>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE bookstore SYSTEM "bookstore.dtd">
<bookstore xmlns:bk="http://example.com/books" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://example.com/books bookstore.xsd">
<metadata>
<description>This is a sample bookstore XML file containing nonfiction science books.</description>
</metadata>
<bk:book category="science">
<bk:title lang="en">A Brief History of Time</bk:title>
<bk:author>Stephen Hawking</bk:author>
<bk:year>1988</bk:year>
<bk:price>25.00</bk:price>
<signedinfo>
<signature>XYZ123456789</signature>
<timestamp>2024-04-05T14:00:00</timestamp>
</signedinfo>
</bk:book>
<bk:book category="science">
<bk:title lang="en">Cosmos</bk:title>
<bk:author>Carl Sagan</bk:author>
<bk:year>1980</bk:year>
<bk:price>20.00</bk:price>
<signedinfo>
<signature>987ABCDEF321</signature>
<timestamp>2024-04-05T15:00:00</timestamp>
</signedinfo>
</bk:book>
</bookstore>
Loading

0 comments on commit a88ede1

Please sign in to comment.