Skip to content

Commit

Permalink
fix(agent): PENG-2457 modify the fetch_influx_data and `update_job_…
Browse files Browse the repository at this point in the history
…metrics` functions (#677)

This commit modifies the mentioned functions to consider the case where the response of the
*GET jobbergate/job-submissions/agent/metrics/{job_submission_id}* returns an empty list
in the *max_times* key.
  • Loading branch information
matheushent authored Dec 19, 2024
1 parent 0b9823e commit 97ba3f2
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 48 deletions.
65 changes: 47 additions & 18 deletions jobbergate-agent/jobbergate_agent/jobbergate/update.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
from itertools import chain
from textwrap import dedent
from typing import List

import msgpack
Expand All @@ -17,7 +18,7 @@
InfluxDBPointDict,
)
from jobbergate_agent.settings import SETTINGS
from jobbergate_agent.utils.exception import JobbergateApiError, SbatchError
from jobbergate_agent.utils.exception import JobbergateApiError, SbatchError, JobbergateAgentError
from jobbergate_agent.utils.logging import log_error
from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT
from jobbergate_agent.utils.compute import aggregate_influx_measures
Expand Down Expand Up @@ -83,20 +84,39 @@ async def update_job_data(


async def fetch_influx_data(
time: int, host: str, step: int, task: int, job: int, measurement: INFLUXDB_MEASUREMENT
job: int,
measurement: INFLUXDB_MEASUREMENT,
*,
time: int | None = None,
host: str | None = None,
step: int | None = None,
task: int | None = None,
) -> list[InfluxDBPointDict]:
"""
Fetch data from InfluxDB for a given host, step and task.
"""
query = f"""
SELECT * FROM {measurement} WHERE time > $time AND host = $host AND step = $step AND task = $task AND job = $job
"""
with JobbergateApiError.handle_errors("Failed to fetch data from InfluxDB", do_except=log_error):
with JobbergateAgentError.handle_errors("Failed to fetch measures from InfluxDB", do_except=log_error):
all_none = all(arg is None for arg in [time, host, step, task])
all_set = all(arg is not None for arg in [time, host, step, task])

if not (all_none or all_set):
raise ValueError("Invalid argument combination: all optional arguments must be either set or None.")

if all_set:
query = dedent(f"""
SELECT * FROM {measurement} WHERE time > $time AND host = $host AND step = $step AND task = $task AND job = $job
""")
params = {"time": time, "host": host, "step": str(step), "task": str(task), "job": str(job)}
else:
query = f"SELECT * FROM {measurement} WHERE job = $job"
params = {"job": str(job)}

assert influxdb_client is not None # mypy assertion
params = dict(time=time, host=host, step=str(step), task=str(task), job=str(job))

logger.debug(f"Querying InfluxDB with: {query=}, {params=}")
result = influxdb_client.query(query, bind_params=params, epoch="us")
logger.debug("Successfully fetched data from InfluxDB")

return [
InfluxDBPointDict(
time=point["time"],
Expand Down Expand Up @@ -140,18 +160,27 @@ async def update_job_metrics(active_job_submittion: ActiveJobSubmission) -> None

influx_measurements = fetch_influx_measurements()

tasks = (
fetch_influx_data(
job_max_time.max_time,
job_max_time.node_host,
job_max_time.step,
job_max_time.task,
active_job_submittion.slurm_job_id,
measurement["name"],
if not job_max_times.max_times:
tasks = (
fetch_influx_data(
active_job_submittion.slurm_job_id,
measurement["name"],
)
for measurement in influx_measurements
)
else:
tasks = (
fetch_influx_data(
active_job_submittion.slurm_job_id,
measurement["name"],
time=job_max_time.max_time,
host=job_max_time.node_host,
step=job_max_time.step,
task=job_max_time.task,
)
for job_max_time in job_max_times.max_times
for measurement in influx_measurements
)
for job_max_time in job_max_times.max_times
for measurement in influx_measurements
)
results = await asyncio.gather(*list(tasks))
data_points = chain.from_iterable(results)
aggregated_data_points = aggregate_influx_measures(data_points)
Expand Down
14 changes: 7 additions & 7 deletions jobbergate-agent/jobbergate_agent/utils/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,31 @@
from buzz.tools import DoExceptParams, noop


class ClusterAgentError(Buzz):
class JobbergateAgentError(Buzz):
"""Raise exception when execution command returns an error"""


class ProcessExecutionError(ClusterAgentError):
class ProcessExecutionError(JobbergateAgentError):
"""Raise exception when execution command returns an error"""


class AuthTokenError(ClusterAgentError):
class AuthTokenError(JobbergateAgentError):
"""Raise exception when there are connection issues with the backend"""


class SbatchError(ClusterAgentError):
class SbatchError(JobbergateAgentError):
"""Raise exception when sbatch raises any error"""


class JobbergateApiError(ClusterAgentError):
class JobbergateApiError(JobbergateAgentError):
"""Raise exception when communication with Jobbergate API fails"""


class JobSubmissionError(ClusterAgentError):
class JobSubmissionError(JobbergateAgentError):
"""Raise exception when a job cannot be submitted raises any error"""


class SlurmParameterParserError(ClusterAgentError):
class SlurmParameterParserError(JobbergateAgentError):
"""Raise exception when Slurm mapper or SBATCH parser face any error"""


Expand Down
Loading

0 comments on commit 97ba3f2

Please sign in to comment.