Skip to content

Commit

Permalink
Json refractor
Browse files Browse the repository at this point in the history
  • Loading branch information
frack113 committed Aug 16, 2024
1 parent bed1f2b commit b9c1d10
Show file tree
Hide file tree
Showing 6 changed files with 3,358 additions and 2,083 deletions.
71 changes: 18 additions & 53 deletions sigma/validators/sigmahq/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,61 +15,28 @@ def load_remote_json(url: str, filename: str) -> dict:
return json_dict


def core_logsource(source: SigmaLogSource) -> SigmaLogSource:
return SigmaLogSource(
product=source.product, category=source.category, service=source.service
)


def load_taxonomy_json(json_name: str) -> dict:
field_info = {}
common_info = {}
addon_info = {}

json_dict = load_remote_json("github", json_name)
for key in json_dict["common"]:
info = json_dict["common"][key]
logsource = SigmaLogSource(
product=info["product"], category=info["category"], service=info["service"]
)
common_info[logsource] = info["data"]

for key in json_dict["addon"]:
info = json_dict["addon"][key]
logsource = SigmaLogSource(
product=info["product"], category=info["category"], service=info["service"]
)
addon_info[logsource] = info["data"]

for key in json_dict["field"]:
info = json_dict["field"][key]
logsource = SigmaLogSource(
product=info["product"], category=info["category"], service=info["service"]
)
field_info[logsource] = info["data"]

if len(info["data"]) > 0:
if (
logsource.product
and SigmaLogSource(product=logsource.product) in common_info
):
field_info[logsource] += common_info[
SigmaLogSource(product=logsource.product)
]
if logsource in addon_info:
field_info[logsource] += addon_info[logsource]
if "Hashes" in info["data"] or "Hash" in info["data"]:
field_info[logsource] += ["md5", "sha1", "sha256", "Imphash"]

for value in json_dict["taxonomy"].values():
logsource = core_logsource(SigmaLogSource.from_dict(value["logsource"]))
field_info[logsource] = value["field"]
return field_info


def load_filepattern_json(json_name):
prefix_info = {}
json_dict = load_remote_json("github", json_name)
data = {}
for key in json_dict["logsource"]:
data[
SigmaLogSource(
product=json_dict["logsource"][key]["product"],
category=json_dict["logsource"][key]["category"],
service=json_dict["logsource"][key]["service"],
)
] = json_dict["logsource"][key]["prefix"]
return data, json_dict["product"]
for value in json_dict["pattern"].values():
logsource = core_logsource(SigmaLogSource.from_dict(value["logsource"]))
prefix_info[logsource] = value["prefix"]
return prefix_info


def load_windows_json(json_name):
Expand All @@ -89,7 +56,6 @@ class ConfigHQ:
sigma_taxonomy_unicast: Dict[SigmaLogSource, List[str]] = {}

sigmahq_logsource_filepattern: Dict[SigmaLogSource, str] = {}
sigmahq_product_prefix: Dict[str, str] = {}

windows_no_eventid: List[str] = []
windows_provider_name: Dict[SigmaLogSource, List[str]] = {}
Expand All @@ -103,15 +69,14 @@ class ConfigHQ:
)

def __init__(self) -> None:
self.sigma_taxonomy = load_taxonomy_json("sigma_taxonomy.json")
self.sigma_taxonomy = load_taxonomy_json("sigma.json")
self.sigma_taxonomy_unicast = {
k: [v.lower() for v in l] for k, l in self.sigma_taxonomy.items()
}

(
self.sigmahq_logsource_filepattern,
self.sigmahq_product_prefix,
) = load_filepattern_json("sigmahq_filepattern.json")
self.sigmahq_logsource_filepattern = load_filepattern_json(
"sigmahq_filename.json"
)
self.windows_no_eventid, self.windows_provider_name = load_windows_json(
"sigmahq_windows_validator.json"
)
35 changes: 20 additions & 15 deletions sigma/validators/sigmahq/filename.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class SigmahqFilenamePrefixValidator(SigmaRuleValidator):
def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
if rule.source is not None:
filename = rule.source.path.name
logsource = rule.logsource
logsource = SigmaLogSource(
rule.logsource.category, rule.logsource.product, rule.logsource.service
)

if logsource in config.sigmahq_logsource_filepattern:
if not filename.startswith(
Expand All @@ -65,23 +67,26 @@ def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
SigmahqFilenamePrefixIssue(
rule,
filename,
logsource,
rule.logsource,
config.sigmahq_logsource_filepattern[logsource],
)
]
else:
if (
logsource.product in config.sigmahq_product_prefix
and not filename.startswith(
config.sigmahq_product_prefix[logsource.product]
)
):
return [
SigmahqFilenamePrefixIssue(
rule,
filename,
logsource,
config.sigmahq_product_prefix[logsource.product],
# check only product but must exist
if rule.logsource.product:
logsource = SigmaLogSource(None, rule.logsource.product, None)
if (
logsource in config.sigmahq_logsource_filepattern
and not filename.startswith(
config.sigmahq_logsource_filepattern[logsource]
)
]
):
return [
SigmahqFilenamePrefixIssue(
rule,
filename,
rule.logsource,
config.sigmahq_logsource_filepattern[logsource],
)
]
return []
Loading

0 comments on commit b9c1d10

Please sign in to comment.