Skip to content

Commit

Permalink
pagination support
Browse files Browse the repository at this point in the history
  • Loading branch information
rednithin committed Nov 29, 2024
1 parent 6302599 commit 936bfdb
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,39 @@ async def call_single_threat_endpoint(
ctx: Context, threat_id: str, semaphore: asyncio.Semaphore
) -> List[str]:
async with semaphore:
endpoint = compute_url(ctx.BASE_URL, f"/v1/threats/{threat_id}", params={})
headers = get_headers(ctx)
filtered_messages = []

response = await fetch_with_retries(url=endpoint, headers=headers)
nextPageNumber = 1
params = {"pageSize": ctx.SINGLE_THREAT_PAGE_SIZE}
while nextPageNumber:
params["pageNumber"] = nextPageNumber
print("Single Threat Params:", params)
endpoint = compute_url(ctx.BASE_URL, f"/v1/threats/{threat_id}", params=params)
headers = get_headers(ctx)

filtered_messages = []
for message in response["messages"]:
message_id = message["abxMessageId"]
remediation_time_str = message["remediationTimestamp"]

remediation_time = try_str_to_datetime(remediation_time_str)
if (
remediation_time >= ctx.CLIENT_FILTER_TIME_RANGE.start
and remediation_time < ctx.CLIENT_FILTER_TIME_RANGE.end
):
filtered_messages.append(json.dumps(message, sort_keys=True))
logging.info(f"Successfully processed v2 threat message: {message_id}")
else:
logging.warning(f"Skipped processing v2 threat message: {message_id}")

return filtered_messages
response = await fetch_with_retries(url=endpoint, headers=headers)

for message in response["messages"]:
message_id = message["abxMessageId"]
remediation_time_str = message["remediationTimestamp"]

remediation_time = try_str_to_datetime(remediation_time_str)
if (
remediation_time >= ctx.CLIENT_FILTER_TIME_RANGE.start
and remediation_time < ctx.CLIENT_FILTER_TIME_RANGE.end
):
filtered_messages.append(json.dumps(message, sort_keys=True))
logging.info(f"Successfully processed v2 threat message: {message_id}")
else:
logging.warning(f"Skipped processing v2 threat message: {message_id}")

nextPageNumber = response.get("nextPageNumber")
assert nextPageNumber is None or nextPageNumber > 0

if nextPageNumber is None or nextPageNumber > ctx.MAX_PAGE_NUMBER:
break

return list(set(filtered_messages))


async def call_single_case_endpoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class Context(BaseModel):
LIMIT: timedelta
NUM_CONCURRENCY: int
MAX_PAGE_NUMBER: int
SINGLE_THREAT_PAGE_SIZE: int
BASE_URL: str
API_TOKEN: str
TIME_RANGE: TimeRange
Expand Down Expand Up @@ -140,15 +141,16 @@ def get_context(stored_date_time: str) -> Context:
BASE_URL = os.environ.get("API_HOST", "https://api.abnormalplatform.com/v1")
API_TOKEN = os.environ["ABNORMAL_SECURITY_REST_API_TOKEN"]
OUTAGE_TIME = timedelta(
minutes=int(os.environ.get("ABNORMAL_OUTAGE_TIME_MIN", "15"))
minutes=int(os.environ.get("ABNORMAL_OUTAGE_TIME_MIN", "45"))
)
LAG_ON_BACKEND = timedelta(
seconds=int(os.environ.get("ABNORMAL_LAG_ON_BACKEND_SEC", "30"))
)
FREQUENCY = timedelta(minutes=int(os.environ.get("ABNORMAL_FREQUENCY_MIN", "5")))
LIMIT = timedelta(minutes=int(os.environ.get("ABNORMAL_LIMIT_MIN", "6")))
NUM_CONCURRENCY = int(os.environ.get("ABNORMAL_NUM_CONCURRENCY", "5"))
NUM_CONCURRENCY = int(os.environ.get("ABNORMAL_NUM_CONCURRENCY", "3"))
MAX_PAGE_NUMBER = int(os.environ.get("ABNORMAL_MAX_PAGE_NUMBER", "3"))
SINGLE_THREAT_PAGE_SIZE = int(os.environ.get("ABNORMAL_SINGLE_THREAT_PAGE_SIZE", "40"))

STORED_TIME = try_str_to_datetime(stored_date_time)
CURRENT_TIME = try_str_to_datetime(datetime.now().strftime(TIME_FORMAT))
Expand All @@ -171,7 +173,8 @@ def get_context(stored_date_time: str) -> Context:
CURRENT_TIME=CURRENT_TIME,
LIMIT=LIMIT,
TRACE_ID=uuid4(),
PYTHON_VERSION=sys.version
PYTHON_VERSION=sys.version,
SINGLE_THREAT_PAGE_SIZE=SINGLE_THREAT_PAGE_SIZE
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,15 @@ def setUp(self):
STORED_TIME=datetime(2024, 10, 1, 12, 55),
CURRENT_TIME=datetime(2024, 10, 1, 13, 0),
TRACE_ID=self.trace_id,
PYTHON_VERSION="3.11"
PYTHON_VERSION="3.11",
SINGLE_THREAT_PAGE_SIZE=40
)

def test_valid_headers(self):
# Test case for valid headers
headers = get_headers(self.ctx)
expected_headers = {
"X-Sentinel-Context": "eyJMQUdfT05fQkFDS0VORCI6IDMwLjAsICJPVVRBR0VfVElNRSI6IDkwMC4wLCAiRlJFUVVFTkNZIjogMzAwLjAsICJMSU1JVCI6IDM2MC4wLCAiTlVNX0NPTkNVUlJFTkNZIjogNSwgIk1BWF9QQUdFX05VTUJFUiI6IDEwMCwgIkJBU0VfVVJMIjogImh0dHA6Ly9leGFtcGxlLmNvbSIsICJUSU1FX1JBTkdFIjogeyJzdGFydCI6ICIyMDI0LTEwLTAxVDEyOjU1OjAwIiwgImVuZCI6ICIyMDI0LTEwLTAxVDEzOjAwOjAwIn0sICJDTElFTlRfRklMVEVSX1RJTUVfUkFOR0UiOiB7InN0YXJ0IjogIjIwMjQtMTAtMDFUMTI6NTQ6MzAiLCAiZW5kIjogIjIwMjQtMTAtMDFUMTI6NTk6MzAifSwgIlNUT1JFRF9USU1FIjogIjIwMjQtMTAtMDFUMTI6NTU6MDAiLCAiQ1VSUkVOVF9USU1FIjogIjIwMjQtMTAtMDFUMTM6MDA6MDAiLCAiVFJBQ0VfSUQiOiAiYmRiMmExMjctZWQzZC00NjRhLWIyMDUtMzgyMGNjZjZkM2YyIiwgIlBZVEhPTl9WRVJTSU9OIjogIjMuMTEifQ==",
"X-Sentinel-Context": "eyJMQUdfT05fQkFDS0VORCI6IDMwLjAsICJPVVRBR0VfVElNRSI6IDkwMC4wLCAiRlJFUVVFTkNZIjogMzAwLjAsICJMSU1JVCI6IDM2MC4wLCAiTlVNX0NPTkNVUlJFTkNZIjogNSwgIk1BWF9QQUdFX05VTUJFUiI6IDEwMCwgIlNJTkdMRV9USFJFQVRfUEFHRV9TSVpFIjogNDAsICJCQVNFX1VSTCI6ICJodHRwOi8vZXhhbXBsZS5jb20iLCAiVElNRV9SQU5HRSI6IHsic3RhcnQiOiAiMjAyNC0xMC0wMVQxMjo1NTowMCIsICJlbmQiOiAiMjAyNC0xMC0wMVQxMzowMDowMCJ9LCAiQ0xJRU5UX0ZJTFRFUl9USU1FX1JBTkdFIjogeyJzdGFydCI6ICIyMDI0LTEwLTAxVDEyOjU0OjMwIiwgImVuZCI6ICIyMDI0LTEwLTAxVDEyOjU5OjMwIn0sICJTVE9SRURfVElNRSI6ICIyMDI0LTEwLTAxVDEyOjU1OjAwIiwgIkNVUlJFTlRfVElNRSI6ICIyMDI0LTEwLTAxVDEzOjAwOjAwIiwgIlRSQUNFX0lEIjogImJkYjJhMTI3LWVkM2QtNDY0YS1iMjA1LTM4MjBjY2Y2ZDNmMiIsICJQWVRIT05fVkVSU0lPTiI6ICIzLjExIn0=",
"X-Abnormal-Trace-Id": str(self.trace_id),
"Authorization": f"Bearer {self.api_token}",
"Soar-Integration-Origin": "AZURE SENTINEL",
Expand Down Expand Up @@ -261,7 +262,7 @@ def test_compute_url_with_port_in_base_url(self):
"SentinelFunctionsOrchestrator.soar_connector_async_v2.fetch_with_retries",
new_callable=AsyncMock,
)
async def test_get_threats(mock_fetch):
async def test_get_threats(mock_fetch: AsyncMock):
mock_intervals = [
MagicMock(start="2024-10-01T13:00:00Z", end=None),
]
Expand All @@ -285,6 +286,7 @@ async def test_get_threats(mock_fetch):
ctx.NUM_CONCURRENCY = 2
ctx.CLIENT_FILTER_TIME_RANGE.start = try_str_to_datetime("2024-10-01T12:00:00Z")
ctx.CLIENT_FILTER_TIME_RANGE.end = try_str_to_datetime("2024-10-01T13:00:00Z")
ctx.SINGLE_THREAT_PAGE_SIZE = 40

output_queue = asyncio.Queue()

Expand All @@ -308,6 +310,8 @@ async def test_get_threats(mock_fetch):
await get_threats(ctx, output_queue)

# Ensure fetch_with_retries was called with expected values
mock_fetch.assert_any_call(url='http://example.com/v1/threats', headers={'Authorization': 'Bearer token'})
mock_fetch.assert_any_call(url='http://example.com/v1/threats/threat1?pageSize=40&pageNumber=1', headers={'Authorization': 'Bearer token'})
assert mock_fetch.call_count == 2

# Ensure the messages were put into the output queue
Expand Down Expand Up @@ -336,6 +340,7 @@ async def test_get_cases(mock_fetch):
ctx.NUM_CONCURRENCY = 2
ctx.CLIENT_FILTER_TIME_RANGE.start = try_str_to_datetime("2024-10-01T12:00:00Z")
ctx.CLIENT_FILTER_TIME_RANGE.end = try_str_to_datetime("2024-10-01T13:00:00Z")
ctx.SINGLE_THREAT_PAGE_SIZE = 40

output_queue = asyncio.Queue()

Expand Down Expand Up @@ -371,6 +376,8 @@ async def test_get_cases(mock_fetch):
await get_cases(ctx, output_queue)

# Ensure fetch_with_retries was called with expected values
mock_fetch.assert_any_call(url='http://example.com/v1/cases', headers={'Authorization': 'Bearer token'})
mock_fetch.assert_any_call(url='http://example.com/v1/cases/case1', headers={'Authorization': 'Bearer token'})
assert mock_fetch.call_count == 2

# Ensure the cases were put into the output queue
Expand All @@ -385,6 +392,186 @@ async def test_get_cases(mock_fetch):
assert output_message == expected_record


@pytest.mark.asyncio
@patch(
"SentinelFunctionsOrchestrator.soar_connector_async_v2.fetch_with_retries",
new_callable=AsyncMock,
)
async def test_get_threats_paginated(mock_fetch: AsyncMock):
mock_intervals = [
MagicMock(start="2024-10-01T13:00:00Z", end=None),
]

mock_threat_campaign_response = {
"total": 1,
"threats": [{"threatId": "abca34c0-04fc-222d-30f6-9e62e51dfc95"}],
"nextPageNumber": None,
}

mock_single_threat_response_1 = {
"threatId": "abca34c0-04fc-222d-30f6-9e62e51dfc95",
"messages": [
{
"abxMessageId": 8340091768378090492,
"remediationTimestamp": "2024-10-01T12:30:40Z"
},
{
"abxMessageId": -7487512360242110741,
"remediationTimestamp": "2024-10-01T12:30:40Z"
},
{
"abxMessageId": -1453682119958233571,
"remediationTimestamp": "2024-10-01T12:30:40Z"
},
{
"abxMessageId": -2738917250488486006,
"remediationTimestamp": "2024-10-01T12:30:40Z"
},
{
"abxMessageId": 482233753373918965,
"remediationTimestamp": "2024-10-01T12:30:40Z"
},
{
"abxMessageId": 9119659315270197918,
"remediationTimestamp": "2024-10-01T12:30:40Z"
},
{
"abxMessageId": 3546172484236699227,
"remediationTimestamp": "2024-10-01T12:30:40Z"
}
],
"pageNumber": 1,
"total": 16,
"nextPageNumber": 2
}
mock_single_threat_response_2 = {
"threatId": "abca34c0-04fc-222d-30f6-9e62e51dfc95",
"messages": [
{
"abxMessageId": -68647174525282065,
"remediationTimestamp": "2024-10-01T12:30:40Z",
},
{
"abxMessageId": 1025490956646620319,
"remediationTimestamp": "2024-10-01T12:30:40Z",
},
{
"abxMessageId": 8353208793487178298,
"remediationTimestamp": "2024-10-01T12:30:40Z",
},
{
"abxMessageId": -7300418853454868601,
"remediationTimestamp": "2024-10-01T12:30:40Z",
},
{
"abxMessageId": -559214588526485457,
"remediationTimestamp": "2024-10-01T12:30:40Z",
},
{
"abxMessageId": 4447975809254795357,
"remediationTimestamp": "2024-10-01T12:30:40Z",
},
{
"abxMessageId": -374721447240777722,
"remediationTimestamp": "2024-10-01T12:30:40Z",
}
],
"pageNumber": 2,
"total": 16,
"nextPageNumber": 3
}
mock_single_threat_response_3 = {
"threatId": "abca34c0-04fc-222d-30f6-9e62e51dfc95",
"messages": [
{
"abxMessageId": 3333927803157276490,
"remediationTimestamp": "2024-10-01T12:30:40Z",
},
{
"abxMessageId": -1777029380775902847,
"remediationTimestamp": "2024-10-01T12:30:40Z",
}
],
"pageNumber": 3,
"total": 16
}
# Mock the context and output queue
ctx = MagicMock()
ctx.BASE_URL = "http://example.com"
ctx.MAX_PAGE_NUMBER = 10
ctx.NUM_CONCURRENCY = 2
ctx.CLIENT_FILTER_TIME_RANGE.start = try_str_to_datetime("2024-10-01T12:00:00Z")
ctx.CLIENT_FILTER_TIME_RANGE.end = try_str_to_datetime("2024-10-01T13:00:00Z")
ctx.SINGLE_THREAT_PAGE_SIZE = 7

output_queue = asyncio.Queue()

# Mock the functions and methods used in get_threats
mock_fetch.side_effect = [
mock_threat_campaign_response,
mock_single_threat_response_1,
mock_single_threat_response_2,
mock_single_threat_response_3,
]

with patch(
"SentinelFunctionsOrchestrator.soar_connector_async_v2.compute_intervals",
return_value=mock_intervals,
):
with patch(
"SentinelFunctionsOrchestrator.soar_connector_async_v2.get_query_params"
) as mock_get_query_params:
with patch(
"SentinelFunctionsOrchestrator.soar_connector_async_v2.get_headers",
return_value={"Authorization": "Bearer token"},
):
await get_threats(ctx, output_queue)

# Ensure fetch_with_retries was called with expected values
mock_fetch.assert_any_call(url='http://example.com/v1/threats', headers={'Authorization': 'Bearer token'})
mock_fetch.assert_any_call(url='http://example.com/v1/threats/abca34c0-04fc-222d-30f6-9e62e51dfc95?pageSize=7&pageNumber=1', headers={'Authorization': 'Bearer token'})
mock_fetch.assert_any_call(url='http://example.com/v1/threats/abca34c0-04fc-222d-30f6-9e62e51dfc95?pageSize=7&pageNumber=2', headers={'Authorization': 'Bearer token'})
mock_fetch.assert_any_call(url='http://example.com/v1/threats/abca34c0-04fc-222d-30f6-9e62e51dfc95?pageSize=7&pageNumber=3', headers={'Authorization': 'Bearer token'})
assert mock_fetch.call_count == 4

# Ensure the messages were put into the output queue
assert output_queue.qsize() == 16

# Validate the content of the output queue

expected_records = [
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 8340091768378090492, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -7487512360242110741, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -1453682119958233571, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -2738917250488486006, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 482233753373918965, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 9119659315270197918, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 3546172484236699227, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -68647174525282065, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 1025490956646620319, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 8353208793487178298, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -7300418853454868601, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -559214588526485457, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 4447975809254795357, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -374721447240777722, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": 3333927803157276490, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
("ABNORMAL_THREAT_MESSAGES", {"abxMessageId": -1777029380775902847, "remediationTimestamp": "2024-10-01T12:30:40Z"}),
]

actual_records = []

while not output_queue.empty():
output_message = await output_queue.get()
actual_records.append(output_message)

assert sorted(map(lambda x: str(x), expected_records)) == sorted(map(lambda x: str(x), actual_records))
assert len(expected_records) == len(actual_records)
assert output_queue.empty()





if __name__ == "__main__":
unittest.main()
pytest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ def setUp(self):
STORED_TIME=datetime(2024, 10, 1, 12, 55),
CURRENT_TIME=datetime(2024, 10, 1, 13, 0),
TRACE_ID=uuid4(),
PYTHON_VERSION="3.11"
PYTHON_VERSION="3.11",
SINGLE_THREAT_PAGE_SIZE=40
)

def test_valid_intervals(self):
Expand Down

0 comments on commit 936bfdb

Please sign in to comment.