From c391f164632efbb1b5ee89ae66b2a6fb37437abc Mon Sep 17 00:00:00 2001 From: Matheus Tosta Date: Thu, 19 Dec 2024 17:10:27 -0400 Subject: [PATCH] fix(agent): PENG-2457 defer sending data to the API in case there's no new data This commit introduces a change in the *update_job_metrics* function so the API call to insert new metrics is deferred in case there's no new metric, i.e. the list of metrics is empty. --- .../jobbergate_agent/jobbergate/update.py | 3 + .../tests/jobbergate/test_update.py | 57 +++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/update.py b/jobbergate-agent/jobbergate_agent/jobbergate/update.py index 1adf08c0..e7458392 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/update.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/update.py @@ -184,6 +184,9 @@ async def update_job_metrics(active_job_submittion: ActiveJobSubmission) -> None results = await asyncio.gather(*list(tasks)) data_points = chain.from_iterable(results) aggregated_data_points = aggregate_influx_measures(data_points) + if not aggregated_data_points: + # defer the API call since there's no data to be sent + return packed_data = msgpack.packb(aggregated_data_points) response = await jobbergate_api_client.put( diff --git a/jobbergate-agent/tests/jobbergate/test_update.py b/jobbergate-agent/tests/jobbergate/test_update.py index 5f805b2d..06066d5e 100644 --- a/jobbergate-agent/tests/jobbergate/test_update.py +++ b/jobbergate-agent/tests/jobbergate/test_update.py @@ -858,3 +858,60 @@ async def test_update_job_metrics__success_with_max_times_empty( mocked_chain.from_iterable.assert_called_once_with([dummy_data_points] * len(measurements)) mocked_aggregate_influx_measures.assert_called_once_with(iter_dummy_data_points) mocked_msgpack.packb.assert_called_once_with("super-dummy-aggregated-data") + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("mock_access_token") +@pytest.mark.parametrize( + "job_submission_id, slurm_job_id, measurements", + [ + (1, 22, [{"name": "measurement1"}, {"name": "measurement2"}]), + (2, 33, [{"name": "measurement1"}]), + (3, 11, [{"name": "measurement1"}, {"name": "measurement2"}, {"name": "measurement3"}]), + ], +) +@mock.patch("jobbergate_agent.jobbergate.update.fetch_influx_measurements") +@mock.patch("jobbergate_agent.jobbergate.update.fetch_influx_data") +@mock.patch("jobbergate_agent.jobbergate.update.aggregate_influx_measures") +@mock.patch("jobbergate_agent.jobbergate.update.msgpack") +@mock.patch("jobbergate_agent.jobbergate.update.chain") +async def test_update_job_metrics__defer_api_call_upon_no_new_data( + mocked_chain: mock.MagicMock, + mocked_msgpack: mock.MagicMock, + mocked_aggregate_influx_measures: mock.MagicMock, + mocked_fetch_influx_data: mock.MagicMock, + mocked_fetch_influx_measurements: mock.MagicMock, + job_submission_id: int, + slurm_job_id: int, + measurements: list[dict[str, str]], + job_max_times_response: Callable[[int, int, int, int], dict[str, int | list[dict[str, int | str]]]], +): + """ + Test that the ``update_job_metrics()`` function will defer sending data to the API + when there's no new data to send. + """ + active_job_submission = ActiveJobSubmission(id=job_submission_id, slurm_job_id=slurm_job_id) + job_max_times = job_max_times_response(job_submission_id, 0, 0, 0) + + mocked_fetch_influx_measurements.return_value = measurements + mocked_fetch_influx_data.return_value = [] + mocked_chain.from_iterable.return_value = [] + mocked_aggregate_influx_measures.return_value = [] + + with respx.mock: + respx.get(f"{SETTINGS.BASE_API_URL}/jobbergate/job-submissions/agent/metrics/{job_submission_id}").mock( + return_value=httpx.Response( + status_code=200, + json=job_max_times, + ) + ) + + await update_job_metrics(active_job_submission) + + mocked_fetch_influx_measurements.assert_called_once_with() + mocked_fetch_influx_data.assert_has_calls( + [mock.call(slurm_job_id, measurement["name"]) for measurement in measurements] + ) + mocked_chain.from_iterable.assert_called_once_with([[] for _ in range(len(measurements))]) + mocked_aggregate_influx_measures.assert_called_once_with([]) + mocked_msgpack.packb.assert_not_called()