Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve lookup regex - Step 1 - ESCU 5.0 #274

Merged
merged 16 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 38 additions & 26 deletions contentctl/actions/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from contentctl.output.conf_writer import ConfWriter
from contentctl.output.api_json_output import ApiJsonOutput
from contentctl.output.data_source_writer import DataSourceWriter
from contentctl.objects.lookup import Lookup
from contentctl.objects.lookup import CSVLookup, Lookup_Type
import pathlib
import json
import datetime
from typing import Union
import uuid

from contentctl.objects.config import build

Expand All @@ -34,27 +34,41 @@ def execute(self, input_dto: BuildInputDto) -> DirectorOutputDto:
updated_conf_files:set[pathlib.Path] = set()
conf_output = ConfOutput(input_dto.config)


# Construct a path to a YML that does not actually exist.
# We mock this "fake" path since the YML does not exist.
# This ensures the checking for the existence of the CSV is correct
data_sources_fake_yml_path = input_dto.config.getPackageDirectoryPath() / "lookups" / "data_sources.yml"

# Construct a special lookup whose CSV is created at runtime and
# written directly into the output folder. It is created with model_construct,
# not model_validate, because the CSV does not exist yet.
# written directly into the lookups folder. We will delete this after a build,
# assuming that it is successful.
data_sources_lookup_csv_path = input_dto.config.getPackageDirectoryPath() / "lookups" / "data_sources.csv"
DataSourceWriter.writeDataSourceCsv(input_dto.director_output_dto.data_sources, data_sources_lookup_csv_path)
input_dto.director_output_dto.addContentToDictMappings(Lookup.model_construct(description= "A lookup file that will contain the data source objects for detections.",
filename=data_sources_lookup_csv_path,
name="data_sources"))



DataSourceWriter.writeDataSourceCsv(input_dto.director_output_dto.data_sources, data_sources_lookup_csv_path)
input_dto.director_output_dto.addContentToDictMappings(CSVLookup.model_construct(name="data_sources",
id=uuid.UUID("b45c1403-6e09-47b0-824f-cf6e44f15ac8"),
version=1,
author=input_dto.config.app.author_name,
date = datetime.date.today(),
description= "A lookup file that will contain the data source objects for detections.",
lookup_type=Lookup_Type.csv,
file_path=data_sources_fake_yml_path))
updated_conf_files.update(conf_output.writeHeaders())
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.detections, SecurityContentType.detections))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.stories, SecurityContentType.stories))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.baselines, SecurityContentType.baselines))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.investigations, SecurityContentType.investigations))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.lookups, SecurityContentType.lookups))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.macros, SecurityContentType.macros))
updated_conf_files.update(conf_output.writeObjects(input_dto.director_output_dto.dashboards, SecurityContentType.dashboards))
updated_conf_files.update(conf_output.writeLookups(input_dto.director_output_dto.lookups))
updated_conf_files.update(conf_output.writeDetections(input_dto.director_output_dto.detections))
updated_conf_files.update(conf_output.writeStories(input_dto.director_output_dto.stories))
updated_conf_files.update(conf_output.writeBaselines(input_dto.director_output_dto.baselines))
updated_conf_files.update(conf_output.writeInvestigations(input_dto.director_output_dto.investigations))
updated_conf_files.update(conf_output.writeMacros(input_dto.director_output_dto.macros))
updated_conf_files.update(conf_output.writeDashboards(input_dto.director_output_dto.dashboards))
updated_conf_files.update(conf_output.writeMiscellaneousAppFiles())




#Ensure that the conf file we just generated/update is syntactically valid
for conf_file in updated_conf_files:
ConfWriter.validateConfFile(conf_file)
Expand All @@ -67,17 +81,15 @@ def execute(self, input_dto: BuildInputDto) -> DirectorOutputDto:
if input_dto.config.build_api:
shutil.rmtree(input_dto.config.getAPIPath(), ignore_errors=True)
input_dto.config.getAPIPath().mkdir(parents=True)
api_json_output = ApiJsonOutput()
for output_objects, output_type in [(input_dto.director_output_dto.detections, SecurityContentType.detections),
(input_dto.director_output_dto.stories, SecurityContentType.stories),
(input_dto.director_output_dto.baselines, SecurityContentType.baselines),
(input_dto.director_output_dto.investigations, SecurityContentType.investigations),
(input_dto.director_output_dto.lookups, SecurityContentType.lookups),
(input_dto.director_output_dto.macros, SecurityContentType.macros),
(input_dto.director_output_dto.deployments, SecurityContentType.deployments)]:
api_json_output.writeObjects(output_objects, input_dto.config.getAPIPath(), input_dto.config.app.label, output_type )


api_json_output = ApiJsonOutput(input_dto.config.getAPIPath(), input_dto.config.app.label)
api_json_output.writeDetections(input_dto.director_output_dto.detections)
api_json_output.writeStories(input_dto.director_output_dto.stories)
api_json_output.writeBaselines(input_dto.director_output_dto.baselines)
api_json_output.writeInvestigations(input_dto.director_output_dto.investigations)
api_json_output.writeLookups(input_dto.director_output_dto.lookups)
api_json_output.writeMacros(input_dto.director_output_dto.macros)
api_json_output.writeDeployments(input_dto.director_output_dto.deployments)


#create version file for sse api
version_file = input_dto.config.getAPIPath()/"version.json"
Expand Down
3 changes: 2 additions & 1 deletion contentctl/actions/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from contentctl.enrichments.attack_enrichment import AttackEnrichment
from contentctl.enrichments.cve_enrichment import CveEnrichment
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.lookup import FileBackedLookup
from contentctl.helper.utils import Utils
from contentctl.objects.data_source import DataSource
from contentctl.helper.splunk_app import SplunkApp
Expand Down Expand Up @@ -64,7 +65,7 @@ def ensure_no_orphaned_files_in_lookups(self, repo_path:pathlib.Path, director_o
lookupsDirectory = repo_path/"lookups"

# Get all of the files referneced by Lookups
usedLookupFiles:list[pathlib.Path] = [lookup.filename for lookup in director_output_dto.lookups if lookup.filename is not None] + [lookup.file_path for lookup in director_output_dto.lookups if lookup.file_path is not None]
usedLookupFiles:list[pathlib.Path] = [lookup.filename for lookup in director_output_dto.lookups if isinstance(lookup, FileBackedLookup)] + [lookup.file_path for lookup in director_output_dto.lookups if lookup.file_path is not None]

# Get all of the mlmodel and csv files in the lookups directory
csvAndMlmodelFiles = Utils.get_security_content_files_from_directory(lookupsDirectory, allowedFileExtensions=[".yml",".csv",".mlmodel"], fileExtensionsToReturn=[".csv",".mlmodel"])
Expand Down
10 changes: 5 additions & 5 deletions contentctl/input/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from contentctl.objects.playbook import Playbook
from contentctl.objects.deployment import Deployment
from contentctl.objects.macro import Macro
from contentctl.objects.lookup import Lookup
from contentctl.objects.lookup import LookupAdapter, Lookup
from contentctl.objects.atomic import AtomicEnrichment
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.data_source import DataSource
Expand Down Expand Up @@ -58,13 +58,12 @@ def addContentToDictMappings(self, content: SecurityContentObject):
f" - {content.file_path}\n"
f" - {self.name_to_content_map[content_name].file_path}"
)

if content.id in self.uuid_to_content_map:
raise ValueError(
f"Duplicate id '{content.id}' with paths:\n"
f" - {content.file_path}\n"
f" - {self.uuid_to_content_map[content.id].file_path}"
)
f" - {self.uuid_to_content_map[content.id].file_path}")

if isinstance(content, Lookup):
self.lookups.append(content)
Expand Down Expand Up @@ -157,7 +156,8 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
modelDict = YmlReader.load_file(file)

if contentType == SecurityContentType.lookups:
lookup = Lookup.model_validate(modelDict, context={"output_dto":self.output_dto, "config":self.input_dto})
lookup = LookupAdapter.validate_python(modelDict, context={"output_dto":self.output_dto, "config":self.input_dto})
#lookup = Lookup.model_validate(modelDict, context={"output_dto":self.output_dto, "config":self.input_dto})
self.output_dto.addContentToDictMappings(lookup)

elif contentType == SecurityContentType.macros:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)

from contentctl.objects.macro import Macro
from contentctl.objects.lookup import Lookup
from contentctl.objects.lookup import Lookup, FileBackedLookup, KVStoreLookup
if TYPE_CHECKING:
from contentctl.input.director import DirectorOutputDto
from contentctl.objects.baseline import Baseline
Expand Down Expand Up @@ -285,10 +285,8 @@ def annotations(self) -> dict[str, Union[List[str], int, str]]:

annotations_dict: dict[str, str | list[str] | int] = {}
annotations_dict["analytic_story"] = [story.name for story in self.tags.analytic_story]
annotations_dict["confidence"] = self.tags.confidence
if len(self.tags.cve or []) > 0:
annotations_dict["cve"] = self.tags.cve
annotations_dict["impact"] = self.tags.impact
annotations_dict["type"] = self.type
annotations_dict["type_list"] = [self.type]
# annotations_dict["version"] = self.version
Expand Down Expand Up @@ -480,6 +478,11 @@ def serialize_model(self):
"source": self.source,
"nes_fields": self.nes_fields,
}
if self.rba is not None:
model["risk_severity"] = self.rba.severity
model['tags']['risk_score'] = self.rba.risk_score
else:
model['tags']['risk_score'] = 0

# Only a subset of macro fields are required:
all_macros: list[dict[str, str | list[str]]] = []
Expand All @@ -497,27 +500,26 @@ def serialize_model(self):

all_lookups: list[dict[str, str | int | None]] = []
for lookup in self.lookups:
if lookup.collection is not None:
if isinstance(lookup, KVStoreLookup):
all_lookups.append(
{
"name": lookup.name,
"description": lookup.description,
"collection": lookup.collection,
"case_sensitive_match": None,
"fields_list": lookup.fields_list
"fields_list": lookup.fields_to_fields_list_conf_format
}
)
elif lookup.filename is not None:
elif isinstance(lookup, FileBackedLookup):
all_lookups.append(
{
"name": lookup.name,
"description": lookup.description,
"filename": lookup.filename.name,
"default_match": "true" if lookup.default_match else "false",
"case_sensitive_match": "true" if lookup.case_sensitive_match else "false",
"match_type": lookup.match_type,
"min_matches": lookup.min_matches,
"fields_list": lookup.fields_list
"match_type": lookup.match_type_to_conf_format,
"min_matches": lookup.min_matches
}
)
model['lookups'] = all_lookups # type: ignore
Expand Down Expand Up @@ -790,7 +792,7 @@ def ensureProperRBAConfig(self):
"""


if self.deployment.alert_action.rba.enabled is False or self.deployment.alert_action.rba is None:
if self.deployment.alert_action.rba is None or self.deployment.alert_action.rba.enabled is False:
# confirm we don't have an RBA config
if self.rba is None:
return self
Expand Down
29 changes: 0 additions & 29 deletions contentctl/objects/detection_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
Cis18Value,
AssetType,
SecurityDomain,
RiskSeverity,
KillChainPhase,
NistCategory,
SecurityContentProductName
Expand All @@ -42,30 +41,7 @@ class DetectionTags(BaseModel):
analytic_story: list[Story] = Field(...)
asset_type: AssetType = Field(...)
group: list[str] = []
confidence: NonNegativeInt = Field(...,le=100)
impact: NonNegativeInt = Field(...,le=100)
@computed_field
@property
def risk_score(self) -> int:
return round((self.confidence * self.impact)/100)

@computed_field
@property
def severity(self)->RiskSeverity:
if 0 <= self.risk_score <= 20:
return RiskSeverity.INFORMATIONAL
elif 20 < self.risk_score <= 40:
return RiskSeverity.LOW
elif 40 < self.risk_score <= 60:
return RiskSeverity.MEDIUM
elif 60 < self.risk_score <= 80:
return RiskSeverity.HIGH
elif 80 < self.risk_score <= 100:
return RiskSeverity.CRITICAL
else:
raise Exception(f"Error getting severity - risk_score must be between 0-100, but was actually {self.risk_score}")


mitre_attack_id: List[MITRE_ATTACK_ID_TYPE] = []
nist: list[NistCategory] = []

Expand All @@ -80,9 +56,6 @@ def severity(self)->RiskSeverity:

# enrichment
mitre_attack_enrichments: List[MitreAttackEnrichment] = Field([], validate_default=True)
confidence_id: Optional[PositiveInt] = Field(None, ge=1, le=3)
impact_id: Optional[PositiveInt] = Field(None, ge=1, le=5)
evidence_str: Optional[str] = None

@computed_field
@property
Expand Down Expand Up @@ -153,9 +126,7 @@ def serialize_model(self):
"cis20": self.cis20,
"kill_chain_phases": self.kill_chain_phases,
"nist": self.nist,
"risk_score": self.risk_score,
"security_domain": self.security_domain,
"risk_severity": self.severity,
"mitre_attack_id": self.mitre_attack_id,
"mitre_attack_enrichments": self.mitre_attack_enrichments
}
Expand Down
Loading
Loading