Skip to content

Commit

Permalink
fix(agent): PENG-2457 defer sending data to the API in case there's n…
Browse files Browse the repository at this point in the history
…o new data

This commit introduces a change in the *update_job_metrics* function so the API call
to insert new metrics is deferred in case there's no new metric, i.e. the list of
metrics is empty.
  • Loading branch information
matheushent committed Dec 19, 2024
1 parent de62d0d commit c391f16
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
3 changes: 3 additions & 0 deletions jobbergate-agent/jobbergate_agent/jobbergate/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ async def update_job_metrics(active_job_submittion: ActiveJobSubmission) -> None
results = await asyncio.gather(*list(tasks))
data_points = chain.from_iterable(results)
aggregated_data_points = aggregate_influx_measures(data_points)
if not aggregated_data_points:
# defer the API call since there's no data to be sent
return
packed_data = msgpack.packb(aggregated_data_points)

response = await jobbergate_api_client.put(
Expand Down
57 changes: 57 additions & 0 deletions jobbergate-agent/tests/jobbergate/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,3 +858,60 @@ async def test_update_job_metrics__success_with_max_times_empty(
mocked_chain.from_iterable.assert_called_once_with([dummy_data_points] * len(measurements))
mocked_aggregate_influx_measures.assert_called_once_with(iter_dummy_data_points)
mocked_msgpack.packb.assert_called_once_with("super-dummy-aggregated-data")


@pytest.mark.asyncio
@pytest.mark.usefixtures("mock_access_token")
@pytest.mark.parametrize(
"job_submission_id, slurm_job_id, measurements",
[
(1, 22, [{"name": "measurement1"}, {"name": "measurement2"}]),
(2, 33, [{"name": "measurement1"}]),
(3, 11, [{"name": "measurement1"}, {"name": "measurement2"}, {"name": "measurement3"}]),
],
)
@mock.patch("jobbergate_agent.jobbergate.update.fetch_influx_measurements")
@mock.patch("jobbergate_agent.jobbergate.update.fetch_influx_data")
@mock.patch("jobbergate_agent.jobbergate.update.aggregate_influx_measures")
@mock.patch("jobbergate_agent.jobbergate.update.msgpack")
@mock.patch("jobbergate_agent.jobbergate.update.chain")
async def test_update_job_metrics__defer_api_call_upon_no_new_data(
mocked_chain: mock.MagicMock,
mocked_msgpack: mock.MagicMock,
mocked_aggregate_influx_measures: mock.MagicMock,
mocked_fetch_influx_data: mock.MagicMock,
mocked_fetch_influx_measurements: mock.MagicMock,
job_submission_id: int,
slurm_job_id: int,
measurements: list[dict[str, str]],
job_max_times_response: Callable[[int, int, int, int], dict[str, int | list[dict[str, int | str]]]],
):
"""
Test that the ``update_job_metrics()`` function will defer sending data to the API
when there's no new data to send.
"""
active_job_submission = ActiveJobSubmission(id=job_submission_id, slurm_job_id=slurm_job_id)
job_max_times = job_max_times_response(job_submission_id, 0, 0, 0)

mocked_fetch_influx_measurements.return_value = measurements
mocked_fetch_influx_data.return_value = []
mocked_chain.from_iterable.return_value = []
mocked_aggregate_influx_measures.return_value = []

with respx.mock:
respx.get(f"{SETTINGS.BASE_API_URL}/jobbergate/job-submissions/agent/metrics/{job_submission_id}").mock(
return_value=httpx.Response(
status_code=200,
json=job_max_times,
)
)

await update_job_metrics(active_job_submission)

mocked_fetch_influx_measurements.assert_called_once_with()
mocked_fetch_influx_data.assert_has_calls(
[mock.call(slurm_job_id, measurement["name"]) for measurement in measurements]
)
mocked_chain.from_iterable.assert_called_once_with([[] for _ in range(len(measurements))])
mocked_aggregate_influx_measures.assert_called_once_with([])
mocked_msgpack.packb.assert_not_called()

0 comments on commit c391f16

Please sign in to comment.