From b4a7e1680edac9ba4b4fa192f69ac10202d20d82 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Tue, 17 Sep 2024 09:05:25 -0700 Subject: [PATCH 01/22] Add types to DynamoDB restore flow. Add Persistable abstract class and to_json methods for classes in DynamoDB restore flow. Write an additional attribute to DynamoDB to capture non-pickled state_data. --- Makefile | 5 +- docs/source/jobs.rst | 4 +- itest.sh | 1 + .../runstate/dynamodb_state_store_test.py | 86 +++++++---- tests/serialize/runstate/statemanager_test.py | 54 ++----- tron/actioncommand.py | 20 ++- tron/core/action.py | 41 +++++- tron/core/actionrun.py | 134 ++++++++++++++---- tron/core/job.py | 60 +++++--- tron/core/job_collection.py | 14 +- tron/core/job_scheduler.py | 4 +- tron/core/jobgraph.py | 8 +- tron/core/jobrun.py | 41 ++++-- tron/mcp.py | 13 +- tron/serialize/runstate/__init__.py | 1 - .../runstate/dynamodb_state_store.py | 86 +++++++++-- tron/serialize/runstate/statemanager.py | 61 ++------ tron/serialize/runstate/yamlstore.py | 1 - tron/utils/collections.py | 1 + tron/utils/crontab.py | 63 ++++---- tron/utils/persistable.py | 11 ++ 21 files changed, 472 insertions(+), 237 deletions(-) create mode 100644 tron/utils/persistable.py diff --git a/Makefile b/Makefile index 1ae4728eb..ddfe573c9 100644 --- a/Makefile +++ b/Makefile @@ -28,10 +28,7 @@ endif -usage: @echo "make test - Run tests" - @echo "make deb_bionic - Generate bionic deb package" - @echo "make itest_bionic - Run tests and integration checks" - @echo "make _itest_bionic - Run only integration checks" - @echo "make deb_jammy - Generate bionic deb package" + @echo "make deb_jammy - Generate jammy deb package" @echo "make itest_jammy - Run tests and integration checks" @echo "make _itest_jammy - Run only integration checks" @echo "make release - Prepare debian info for new release" diff --git a/docs/source/jobs.rst b/docs/source/jobs.rst index ae8199f0b..1d4f7b00c 100644 --- a/docs/source/jobs.rst +++ b/docs/source/jobs.rst @@ -209,7 +209,7 @@ Optional Fields after this duration. **trigger_downstreams** (bool or dict) - Upon successfull completion of an action, will emit a trigger for every + Upon successful completion of an action, will emit a trigger for every item in the dictionary. When set to ``true``, a default dict of ``{shortdate: "{shortdate}"}`` is assumed. Emitted triggers will be in form: ``....``. See @@ -220,7 +220,7 @@ Optional Fields have been emitted by upstream actions. Unlike with ``requires`` attribute, dependent actions don't have to belong to the same job. ``triggered_by`` template may contain any pattern allowed in ``command`` attribute. - See :ref:`shortdate` for an explantion of shortdate + See :ref:`shortdate` for an explanation of shortdate Example: diff --git a/itest.sh b/itest.sh index a16b10bda..7d315c5dd 100755 --- a/itest.sh +++ b/itest.sh @@ -61,6 +61,7 @@ fi kill -SIGTERM $TRON_PID wait $TRON_PID || true +# TODO: Remove this staetmetadata crap as it'll likely fail /opt/venvs/tron/bin/python - < str: + return json.dumps( + { + "status_path": state_data["status_path"], + "exec_path": state_data["exec_path"], + } + ) + def create_action_runner_factory_from_config(config): """A factory-factory method which returns a callable that can be used to diff --git a/tron/core/action.py b/tron/core/action.py index becbafcca..ba7b67ec8 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -1,5 +1,7 @@ import datetime +import json import logging +from collections import namedtuple from dataclasses import dataclass from dataclasses import field from dataclasses import fields @@ -14,12 +16,13 @@ from tron.config.schema import ConfigProjectedSAVolume from tron.config.schema import ConfigSecretVolume from tron.config.schema import ConfigTopologySpreadConstraints +from tron.utils.persistable import Persistable log = logging.getLogger(__name__) @dataclass -class ActionCommandConfig: +class ActionCommandConfig(Persistable): """A configurable data object for one try of an Action.""" command: str @@ -30,7 +33,6 @@ class ActionCommandConfig: cap_drop: List[str] = field(default_factory=list) constraints: set = field(default_factory=set) docker_image: Optional[str] = None - # XXX: we can get rid of docker_parameters once we're off of Mesos docker_parameters: set = field(default_factory=set) env: dict = field(default_factory=dict) secret_env: dict = field(default_factory=dict) @@ -53,6 +55,41 @@ def state_data(self): def copy(self): return ActionCommandConfig(**self.state_data) + @staticmethod + def to_json(state_data: dict) -> str: + """Serialize the ActionCommandConfig instance to a JSON string.""" + + def serialize_namedtuple(obj): + if isinstance(obj, namedtuple): + return obj._asdict() + return obj + + return json.dumps( + { + "command": state_data["command"], + "cpus": state_data["cpus"], + "mem": state_data["mem"], + "disk": state_data["disk"], + "cap_add": list(state_data["cap_add"]), + "cap_drop": list(state_data["cap_drop"]), + "constraints": list(state_data["constraints"]), + "docker_image": state_data["docker_image"], + "docker_parameters": list(state_data["docker_parameters"]), + "env": state_data["env"], + "secret_env": state_data["secret_env"], + "secret_volumes": [serialize_namedtuple(volume) for volume in state_data["secret_volumes"]], + "projected_sa_volumes": [serialize_namedtuple(volume) for volume in state_data["projected_sa_volumes"]], + "field_selector_env": state_data["field_selector_env"], + "extra_volumes": list(state_data["extra_volumes"]), + "node_selectors": state_data["node_selectors"], + "node_affinities": [serialize_namedtuple(affinity) for affinity in state_data["node_affinities"]], + "labels": state_data["labels"], + "annotations": state_data["annotations"], + "service_account_name": state_data["service_account_name"], + "ports": state_data["ports"], + } + ) + @dataclass class Action: diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index b6a07df69..dc0616672 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -2,6 +2,7 @@ tron.core.actionrun """ import datetime +import json import logging import os from dataclasses import dataclass @@ -13,6 +14,7 @@ from typing import Union from twisted.internet import reactor +from twisted.internet.base import DelayedCall from tron import command_context from tron import node @@ -21,9 +23,11 @@ from tron.actioncommand import SubprocessActionRunnerFactory from tron.bin.action_runner import build_environment from tron.bin.action_runner import build_labels +from tron.command_context import CommandContext from tron.config.config_utils import StringFormatter from tron.config.schema import ExecutorTypes from tron.core import action +from tron.core.action import ActionCommandConfig from tron.eventbus import EventBus from tron.kubernetes import KubernetesClusterRepository from tron.kubernetes import KubernetesTask @@ -35,6 +39,7 @@ from tron.utils import timeutils from tron.utils.observer import Observable from tron.utils.observer import Observer +from tron.utils.persistable import Persistable from tron.utils.state import Machine @@ -135,7 +140,7 @@ def action_run_from_state(cls, job_run, state_data, cleanup=False): @dataclass -class ActionRunAttempt: +class ActionRunAttempt(Persistable): """Stores state about one try of an action run.""" command_config: action.ActionCommandConfig @@ -165,6 +170,29 @@ def state_data(self): state_data[field.name] = getattr(self, field.name) return state_data + # "command_config": {} + # "end_time": "2024-01-26T12:25:42.813846", + # "exit_status": 0, + # "kubernetes_task_id": "compute-infra-test-service.test--load--foo19.7850.foo.ia3xrm", + # "mesos_task_id": null, + # "rendered_command": "date; sleep 300; date", + # "start_time": "2024-01-26T12:20:04.141850" + + @staticmethod + def to_json(state_data: dict) -> str: + """Serialize the ActionRunAttempt instance to a JSON string.""" + return json.dumps( + { + "command_config": ActionCommandConfig.to_json(state_data["command_config"]), + "start_time": state_data["start_time"].isoformat() if state_data["start_time"] else None, + "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, + "rendered_command": state_data["rendered_command"], + "exit_status": state_data["exit_status"], + "mesos_task_id": state_data["mesos_task_id"], + "kubernetes_task_id": state_data["kubernetes_task_id"], + } + ) + @classmethod def from_state(cls, state_data): # it's possible that we've rolled back to an older Tron version that doesn't support data that we've persisted @@ -182,7 +210,7 @@ def from_state(cls, state_data): return cls(**valid_actionrun_attempt_entries_from_state) -class ActionRun(Observable): +class ActionRun(Observable, Persistable): """Base class for tracking the state of a single run of an Action. ActionRun's state machine is observed by a parent JobRun. @@ -279,28 +307,28 @@ class ActionRun(Observable): # TODO: create a class for ActionRunId, JobRunId, Etc def __init__( self, - job_run_id, - name, - node, - command_config, - parent_context=None, - output_path=None, - cleanup=False, - start_time=None, - end_time=None, - run_state=SCHEDULED, - exit_status=None, - attempts=None, - action_runner=None, - retries_remaining=None, - retries_delay=None, - machine=None, - executor=None, - trigger_downstreams=None, - triggered_by=None, - on_upstream_rerun=None, - trigger_timeout_timestamp=None, - original_command=None, + job_run_id: str, # TODO: maybe string??? When would these NOT be string? + name: str, # TODO: maybe string??? When would these NOT be string? + node: node.Node, + command_config: action.ActionCommandConfig, + parent_context: Optional[CommandContext] = None, + output_path: Optional[filehandler.OutputPath] = None, + cleanup: bool = False, + start_time: Optional[datetime.datetime] = None, + end_time: Optional[datetime.datetime] = None, + run_state: str = SCHEDULED, + exit_status: Optional[int] = None, + attempts: Optional[list] = None, # TODO: list of...ActionCommandConfig? + action_runner: Optional[Union[NoActionRunnerFactory, SubprocessActionRunnerFactory]] = None, + retries_remaining: Optional[int] = None, + retries_delay: Optional[datetime.timedelta] = None, + machine=None, # TODO str? + executor: Optional[str] = None, + trigger_downstreams=None, # TODO: confirm with test job + triggered_by: Optional[set] = None, # TODO: confirm with test job + on_upstream_rerun: Optional[str] = None, # TODO: confirm with test job + trigger_timeout_timestamp: Optional[float] = None, + original_command: Optional[str] = None, ): super().__init__() self.job_run_id = maybe_decode(job_run_id) @@ -333,7 +361,7 @@ def __init__( self.trigger_timeout_call = None self.action_command = None - self.in_delay = None + self.in_delay = None # type: Optional[DelayedCall] @property def state(self): @@ -671,7 +699,7 @@ def cancel_delay(self): return True @property - def state_data(self): + def state_data(self): # TODO: all these state_data funcs return dict """This data is used to serialize the state of this action run.""" if isinstance(self.action_runner, NoActionRunnerFactory): @@ -702,6 +730,60 @@ def state_data(self): "trigger_timeout_timestamp": self.trigger_timeout_timestamp, } + @staticmethod + def to_json(state_data: dict) -> str: + """Serialize the ActionRun instance to a JSON string.""" + # { + # "action_name": "foo", + # "action_runner": { + # "exec_path": "/opt/venvs/tron/bin", + # "status_path": "/tmp/tron" + # }, + # "attempts": [], + # "end_time": "2024-01-26T12:25:42.813846", + # "executor": "kubernetes", + # "exit_status": 0, + # "job_run_id": "compute-infra-test-service.test_load_foo19.7850", + # "node_name": "paasta", + # "on_upstream_rerun": null, + # "original_command": "date; sleep 300; date", + # "retries_delay": null, + # "retries_remaining": null, + # "start_time": "2024-01-26T12:20:04.141850", + # "state": "succeeded", + # "trigger_downstreams": null, + # "trigger_timeout_timestamp": 1706386800.0, + # "triggered_by": null + # } + + action_runner = state_data.get("action_runner") + if action_runner is None: + action_runner_json = NoActionRunnerFactory.to_json() + else: + action_runner_json = SubprocessActionRunnerFactory.to_json(action_runner) + + return json.dumps( + { + "job_run_id": state_data["job_run_id"], + "action_name": state_data["action_name"], + "state": state_data["state"], + "original_command": state_data["original_command"], + "start_time": state_data["start_time"].isoformat() if state_data["start_time"] else None, + "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, + "node_name": state_data["node_name"], + "exit_status": state_data["exit_status"], + "attempts": [ActionRunAttempt.to_json(attempt) for attempt in state_data["attempts"]], + "retries_remaining": state_data["retries_remaining"], + "retries_delay": state_data["retries_delay"], + "action_runner": action_runner_json, + "executor": state_data["executor"], + "trigger_downstreams": state_data["trigger_downstreams"], + "triggered_by": state_data["triggered_by"], + "on_upstream_rerun": state_data["on_upstream_rerun"], + "trigger_timeout_timestamp": state_data["trigger_timeout_timestamp"], + } + ) + def render_template(self, template): """Render our configured command using the command context.""" return StringFormatter(self.context).format(template) diff --git a/tron/core/job.py b/tron/core/job.py index 95de2a047..87753f0a9 100644 --- a/tron/core/job.py +++ b/tron/core/job.py @@ -1,13 +1,25 @@ +import json import logging +from datetime import timedelta +from typing import Any +from typing import Dict +from typing import Optional +from typing import TypeVar from tron import command_context from tron import node +from tron.actioncommand import SubprocessActionRunnerFactory from tron.core import jobrun +from tron.core.actiongraph import ActionGraph from tron.core.actionrun import ActionRun +from tron.core.jobrun import JobRunCollection +from tron.node import NodePool +from tron.scheduler import GeneralScheduler from tron.serialize import filehandler from tron.utils import maybe_decode from tron.utils.observer import Observable from tron.utils.observer import Observer +from tron.utils.persistable import Persistable class Error(Exception): @@ -24,8 +36,10 @@ class InvalidStartStateError(Error): log = logging.getLogger(__name__) +T = TypeVar("T", bound="Job") -class Job(Observable, Observer): + +class Job(Observable, Observer, Persistable): """A configurable data object. Job uses JobRunCollection to manage its runs, and ActionGraph to manage its @@ -61,26 +75,25 @@ class Job(Observable, Observer): "run_limit", ] - # TODO: use config object def __init__( self, - name, - scheduler, - queueing=True, - all_nodes=False, - monitoring=None, - node_pool=None, - enabled=True, - action_graph=None, - run_collection=None, - parent_context=None, - output_path=None, - allow_overlap=None, - action_runner=None, - max_runtime=None, - time_zone=None, - expected_runtime=None, - run_limit=None, + name: str, # TODO TYPE for self.name: QUESTION: might be bytes instead of string??? + scheduler: GeneralScheduler, + queueing: bool = True, + all_nodes: bool = False, + monitoring: Optional[Dict[str, Any]] = None, + node_pool: Optional[NodePool] = None, + enabled: bool = True, + action_graph: Optional[ActionGraph] = None, + run_collection: Optional[JobRunCollection] = None, + parent_context: Optional[command_context.CommandContext] = None, + output_path: Optional[filehandler.OutputPath] = None, + allow_overlap: Optional[bool] = None, + action_runner: Optional[SubprocessActionRunnerFactory] = None, + max_runtime: Optional[Any] = None, # TODO TYPE for self.max_runtime: + time_zone: Optional[Any] = None, # TODO TYPE for self.time_zone: + expected_runtime: Optional[timedelta] = None, + run_limit: Optional[int] = None, ): super().__init__() self.name = maybe_decode(name) @@ -107,6 +120,14 @@ def __init__( self.run_limit = run_limit log.info(f"{self} created") + log.info(f"TYPE for max_runtime: {type(max_runtime)}") + log.info(f"TYPE for time_zone: {type(time_zone)}") + + @staticmethod + def to_json(state_data: dict) -> str: + """Serialize the Job instance to a JSON string.""" + return json.dumps(state_data) + @classmethod def from_config( cls, @@ -204,6 +225,7 @@ def state_data(self): "enabled": self.enabled, } + # KKASP: called from job_scheduler.py during second restore workflow def get_job_runs_from_state(self, state_data): """Apply a previous state to this Job.""" self.enabled = state_data["enabled"] diff --git a/tron/core/job_collection.py b/tron/core/job_collection.py index 16b3b8404..1921b235e 100644 --- a/tron/core/job_collection.py +++ b/tron/core/job_collection.py @@ -7,6 +7,7 @@ log = logging.getLogger(__name__) +# QUESTION: is JobCollection a combo of Job and JobRun? class JobCollection: """A collection of jobs.""" @@ -40,7 +41,9 @@ def reconfigure_filter(config): else: return config.namespace == namespace_to_reconfigure - # NOTE: as this is a generator expression, we will only go through job configs and build a scheduler for them once something iterates over us (i.e, once `self.state_watcher.watch_all()` is called) + # NOTE: as this is a generator expression, we will only go through job configs + # and build a scheduler for them once something iterates over us (i.e, once + # `self.state_watcher.watch_all()` is called) seq = (factory.build(config) for config in job_configs.values() if reconfigure_filter(config)) return map_to_job_and_schedule(filter(self.add, seq)) @@ -67,6 +70,7 @@ def update(self, new_job_scheduler): job_scheduler.schedule_reconfigured() return True + # TODO: Types?, job_state_data is a weird dict[str, dict[Any?, Any?]], I think config_action_runner is actioncommand.SubprocessActionRunnerFactory OR actioncommand.NoActionRunnerFactory def restore_state(self, job_state_data, config_action_runner): """ Loops through the jobs and their runs in order to load their @@ -74,7 +78,13 @@ def restore_state(self, job_state_data, config_action_runner): runs for each job """ for name, state in job_state_data.items(): - self.jobs[name].restore_state(state, config_action_runner) + log.info(f"kkasp logging bonanza - job_collection name: {name}") + # name: compute-infra-test-service.test_load_foo1 + log.info(f"kkasp logging bonanza - job_collection state: {state}") + # state: {'run_nums': [4, 3, 2, 1, 0], 'enabled': True, runs: [{}, {}]} + self.jobs[name].restore_state( + state, config_action_runner + ) # KKASP: call restore_state on JobScheduler, pass in jobrun state_data log.info(f"Loaded state for {len(job_state_data)} jobs") def get_by_name(self, name): diff --git a/tron/core/job_scheduler.py b/tron/core/job_scheduler.py index f3d26b344..e796bd94c 100644 --- a/tron/core/job_scheduler.py +++ b/tron/core/job_scheduler.py @@ -20,10 +20,11 @@ class JobScheduler(Observer): x seconds into the future. """ - def __init__(self, job): + def __init__(self, job: Job): self.job = job self.watch(job) + # KKASP: Called from job_collection during second restore workflow def restore_state(self, job_state_data, config_action_runner): """Load the job state and schedule any JobRuns.""" job_runs = self.job.get_job_runs_from_state(job_state_data) @@ -250,6 +251,7 @@ def __init__(self, context, output_stream_dir, time_zone, action_runner, job_gra self.action_runner = action_runner self.job_graph = job_graph + # TODO: takes dict[] returns JobScheduler def build(self, job_config): log.debug(f"Building new job scheduler {job_config.name}") output_path = filehandler.OutputPath(self.output_stream_dir) diff --git a/tron/core/jobgraph.py b/tron/core/jobgraph.py index 4c28fd89d..c690b5311 100644 --- a/tron/core/jobgraph.py +++ b/tron/core/jobgraph.py @@ -1,10 +1,16 @@ +import logging from collections import defaultdict from collections import namedtuple +from typing import Optional +from tron.config.config_parse import ConfigContainer from tron.core.action import Action from tron.core.actiongraph import ActionGraph from tron.utils import maybe_decode +log = logging.getLogger(__name__) + + AdjListEntry = namedtuple("AdjListEntry", ["action_name", "is_trigger"]) @@ -13,7 +19,7 @@ class JobGraph: cross-job dependencies (aka triggers) """ - def __init__(self, config_container, should_validate_missing_dependency=False): + def __init__(self, config_container: ConfigContainer, should_validate_missing_dependency: Optional[bool] = False): """Build an adjacency list and a reverse adjacency list for the graph, and store all the actions as well as which actions belong to which job """ diff --git a/tron/core/jobrun.py b/tron/core/jobrun.py index b34d23ca9..54da6f7b3 100644 --- a/tron/core/jobrun.py +++ b/tron/core/jobrun.py @@ -1,6 +1,7 @@ """ Classes to manage job runs. """ +import datetime import json import logging import time @@ -20,6 +21,7 @@ from tron.utils import timeutils from tron.utils.observer import Observable from tron.utils.observer import Observer +from tron.utils.persistable import Persistable log = logging.getLogger(__name__) state_logger = logging.getLogger(f"{__name__}.state_changes") @@ -29,11 +31,11 @@ class Error(Exception): pass -def get_job_run_id(job_name, run_num): +def get_job_run_id(job_name: str, run_num: int) -> str: return f"{job_name}.{run_num}" -class JobRun(Observable, Observer): +class JobRun(Observable, Observer, Persistable): """A JobRun is an execution of a Job. It has a list of ActionRuns and is responsible for starting ActionRuns in the correct order and managing their dependencies. @@ -48,15 +50,15 @@ class JobRun(Observable, Observer): # TODO: use config object def __init__( self, - job_name, - run_num, - run_time, - node, - output_path=None, - base_context=None, - action_runs=None, + job_name: str, + run_num: int, + run_time: datetime.datetime, + node: node.Node, + output_path: Optional[filehandler.OutputPath] = None, + base_context: Optional[command_context.CommandContext] = None, + action_runs=None, # TODO: list of action runs? action_graph: Optional[ActionGraph] = None, - manual=None, + manual: Optional[bool] = None, # TODO: what are you for? ): super().__init__() self.job_name = maybe_decode(job_name) @@ -75,6 +77,23 @@ def __init__( self.context = command_context.build_context(self, base_context) + @staticmethod + def to_json(state_data: dict) -> str: + """Serialize the JobRun instance to a JSON string.""" + return json.dumps( + { + "job_name": state_data["job_name"], + "run_num": state_data["run_num"], + "run_time": state_data["run_time"].isoformat() if state_data["run_time"] else None, + "node_name": state_data["node_name"], + "runs": [ActionRun.to_json(run) for run in state_data["runs"]], # QUESTION: ActionRun vs. Action + "cleanup_run": ActionRun.to_json(state_data["cleanup_run"]) + if state_data["cleanup_run"] + else None, # QUESTION: ActionRun vs. Action + "manual": state_data["manual"], + } + ) + @property def id(self): return get_job_run_id(self.job_name, self.run_num) @@ -105,7 +124,7 @@ def for_job(cls, job, run_num, run_time, node, manual): return run @classmethod - def from_state( + def from_state( # TODO: types cls, state_data, action_graph, diff --git a/tron/mcp.py b/tron/mcp.py index e6b316c10..9cc5152d6 100644 --- a/tron/mcp.py +++ b/tron/mcp.py @@ -85,6 +85,7 @@ def initial_setup(self): # The job schedule factories will be created in the function below self._load_config() # Jobs will also get scheduled (internally) once the state for action runs are restored in restore_state + # KKASP: Restore really kicks off here with timer("self.restore_state"): self.restore_state( actioncommand.create_action_runner_factory_from_config( @@ -182,17 +183,23 @@ def restore_state(self, action_runner): log.info("Restoring from DynamoDB") with timer("restore"): # restores the state of the jobs and their runs from DynamoDB - states = self.state_watcher.restore(self.jobs.get_names()) + # KKASP: The first restore grabs all the data from DynamoDB + states = self.state_watcher.restore( # TODO: what is "states" + self.jobs.get_names() + ) # KKASP: _load_config loads the names into self.jobs + # KKASP: states is a weird hodgepodge of job_state and job_run_state log.info( f"Tron will start restoring state for the jobs and will start scheduling them! Time elapsed since Tron started {time.time() - self.boot_time}" ) # loads the runs' state and schedule the next run for each job with timer("self.jobs.restore_state"): - self.jobs.restore_state(states.get("job_state", {}), action_runner) + self.jobs.restore_state( + states.get("job_state", {}), action_runner + ) # QUESTION: why is job_state here holding both job and job run state? log.info( f"Tron completed restoring state for the jobs. Time elapsed since Tron started {time.time() - self.boot_time}" ) - self.state_watcher.save_metadata() + # self.state_watcher.save_metadata() def __str__(self): return "MCP" diff --git a/tron/serialize/runstate/__init__.py b/tron/serialize/runstate/__init__.py index 7b16b0033..4f31a1807 100644 --- a/tron/serialize/runstate/__init__.py +++ b/tron/serialize/runstate/__init__.py @@ -1,5 +1,4 @@ # State types JOB_STATE = "job_state" JOB_RUN_STATE = "job_run_state" -MCP_STATE = "mcp_state" MESOS_STATE = "mesos_state" diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 4706596a2..733c4bd5f 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -8,16 +8,35 @@ import time from collections import defaultdict from collections import OrderedDict +from typing import Any from typing import DefaultDict +from typing import Dict from typing import List from typing import Sequence from typing import TypeVar import boto3 # type: ignore +from tron.core.job import Job +from tron.core.jobrun import JobRun from tron.metrics import timer +from tron.serialize import runstate -OBJECT_SIZE = 400000 +# Restore +# TODO: Get the correct number of partitions +# TODO: Merge items +# TODO: Restore items + + +# Max DynamoDB object size is 400KB. Since we save two +# copies of the object (pickled and JSON), we need to +# consider this max size applies to the entire item, so +# we use a max size of 200KB for each version. +# +# In testing I could get away with 201_000 for both +# partitions so this should be enough overhead to +# contain the object name and other data. +OBJECT_SIZE = 200_000 # TODO: config this to avoid rolling out new version when we swap back to 400_000? MAX_SAVE_QUEUE = 500 MAX_ATTEMPTS = 10 log = logging.getLogger(__name__) @@ -46,12 +65,12 @@ def build_key(self, type, iden) -> str: def restore(self, keys) -> dict: """ - Fetch all under the same parition key(keys). + Fetch all under the same parition key(s). ret: """ first_items = self._get_first_partitions(keys) remaining_items = self._get_remaining_partitions(first_items) - vals = self._merge_items(first_items, remaining_items) + vals = self._merge_items(first_items, remaining_items) # KKASP: pickle loading happens in here return vals def chunk_keys(self, keys: Sequence[T]) -> List[Sequence[T]]: @@ -106,6 +125,7 @@ def _get_first_partitions(self, keys: list): new_keys = [{"key": {"S": key}, "index": {"N": "0"}} for key in keys] return self._get_items(new_keys) + # TODO: Check max partitions as JSON is larger def _get_remaining_partitions(self, items: list): """Get items in the remaining partitions: N = 1 and beyond""" keys_for_remaining_items = [] @@ -134,6 +154,7 @@ def _merge_items(self, first_items, remaining_items) -> dict: return deserialized_items def save(self, key_value_pairs) -> None: + log.debug(f"Adding to save queue: {key_value_pairs}") for key, val in key_value_pairs: while True: qlen = len(self.save_queue) @@ -142,7 +163,15 @@ def save(self, key_value_pairs) -> None: time.sleep(5) continue with self.save_lock: - self.save_queue[key] = val + if val is None: # QUESTION: is this a real option? + self.save_queue[key] = (val, None) + else: + state_type = self.get_type_from_key(key) + serialized_val = self._serialize_item(state_type, val) + self.save_queue[key] = ( + val, + serialized_val, + ) break def _consume_save_queue(self): @@ -152,19 +181,20 @@ def _consume_save_queue(self): for _ in range(qlen): try: with self.save_lock: - key, val = self.save_queue.popitem(last=False) + key, (original_val, serialized_val) = self.save_queue.popitem(last=False) + log.debug(f"Processing save for {key} with a value of {original_val}") # Remove all previous data with the same partition key # TODO: only remove excess partitions if new data has fewer self._delete_item(key) - if val is not None: - self[key] = pickle.dumps(val) + if original_val is not None: + self.__setitem__(key, pickle.dumps(original_val), serialized_val) # reset errors count if we can successfully save saved += 1 except Exception as e: - error = "tron_dynamodb_save_failure: failed to save key" f'"{key}" to dynamodb:\n{repr(e)}' + error = "tron_dynamodb_save_failure: failed to save key " f'"{key}" to dynamodb:\n{repr(e)}' log.error(error) with self.save_lock: - self.save_queue[key] = val + self.save_queue[key] = (original_val, serialized_val) duration = time.time() - start log.info(f"saved {saved} items in {duration}s") @@ -173,6 +203,17 @@ def _consume_save_queue(self): else: self.save_errors = 0 + def get_type_from_key(self, key: str) -> str: + return key.split()[0] + + def _serialize_item(self, key: str, state: Dict[str, Any]) -> str: + if key == runstate.JOB_STATE: + return Job.to_json(state) + elif key == runstate.JOB_RUN_STATE: + return JobRun.to_json(state) + else: + raise ValueError(f"Unknown type: key {key}") + def _save_loop(self): while True: if self.stopping: @@ -189,7 +230,8 @@ def _save_loop(self): log.error("too many dynamodb errors in a row, crashing") os.exit(1) - def __setitem__(self, key: str, val: bytes) -> None: + # json_val is the non-pickled version of val + def __setitem__(self, key: str, val: bytes, json_val: str) -> None: """ Partition the item and write up to 10 partitions atomically. Retry up to 3 times on failure @@ -200,8 +242,18 @@ def __setitem__(self, key: str, val: bytes) -> None: """ start = time.time() num_partitions = math.ceil(len(val) / OBJECT_SIZE) + num_json_val_partitions = math.ceil(len(json_val) / OBJECT_SIZE) + + log.debug( + f"Saving key: {key} with {num_partitions} pickle partitions and {num_json_val_partitions} json partitions" + ) + items = [] - for index in range(num_partitions): + + # Use the maximum number of partitions (JSON can be larger + # than pickled value so this makes sure we save the entire item) + max_partitions = max(num_partitions, num_json_val_partitions) + for index in range(max_partitions): item = { "Put": { "Item": { @@ -217,6 +269,12 @@ def __setitem__(self, key: str, val: bytes) -> None: "num_partitions": { "N": str(num_partitions), }, + "raw_val": { + "S": json_val[index * OBJECT_SIZE : min(index * OBJECT_SIZE + OBJECT_SIZE, len(json_val))] + }, + "num_raw_val_partitions": { + "N": str(num_json_val_partitions), + }, }, "TableName": self.name, }, @@ -224,7 +282,8 @@ def __setitem__(self, key: str, val: bytes) -> None: count = 0 items.append(item) # Only up to 10 items are allowed per transactions - while len(items) == 10 or index == num_partitions - 1: + # TODO: transact_write_items can take up to 100 items now + while len(items) == 10 or index == max_partitions - 1: try: self.client.transact_write_items(TransactItems=items) items = [] @@ -236,6 +295,7 @@ def __setitem__(self, key: str, val: bytes) -> None: name="tron.dynamodb.setitem", delta=time.time() - start, ) + log.error(f"Failed to save partition for key: {key}, error: {repr(e)}") raise e else: log.warning(f"Got error while saving {key}, trying again: {repr(e)}") @@ -244,6 +304,7 @@ def __setitem__(self, key: str, val: bytes) -> None: delta=time.time() - start, ) + # TODO: Is this ok with we use the max number of partitions? def _delete_item(self, key: str) -> None: start = time.time() try: @@ -261,6 +322,7 @@ def _delete_item(self, key: str) -> None: delta=time.time() - start, ) + # TODO: Get max partitions between pickle and json def _get_num_of_partitions(self, key: str) -> int: """ Return how many parts is the item partitioned into diff --git a/tron/serialize/runstate/statemanager.py b/tron/serialize/runstate/statemanager.py index 5114261a2..0ecf69db8 100644 --- a/tron/serialize/runstate/statemanager.py +++ b/tron/serialize/runstate/statemanager.py @@ -6,6 +6,7 @@ import time from contextlib import contextmanager from typing import Dict +from typing import List from tron.config import schema from tron.core import job @@ -53,42 +54,6 @@ def from_config(cls, persistence_config): return PersistentStateManager(store, buffer) -class StateMetadata: - """A data object for saving state metadata. Conforms to the same - RunState interface as Jobs and Services. - """ - - name = "StateMetadata" - - # State schema version, only first component counts, - # for backwards compatibility - version = (0, 7, 0, 0) - - def __init__(self): - self.state_data = { - "version": self.version, - "create_time": time.time(), - } - - @classmethod - def validate_metadata(cls, metadata): - """Raises an exception if the metadata version is newer then - StateMetadata.version - """ - if not metadata: - return - - if metadata["version"][0] > cls.version[0]: - msg = "State version %s, expected <= %s" - raise VersionMismatchError( - msg - % ( - metadata["version"], - cls.version, - ), - ) - - class StateSaveBuffer: """Buffer calls to save, and perform the saves when buffer reaches buffer size. This buffer will only store one state_data for each key. @@ -138,23 +103,18 @@ def __init__(self, persistence_impl, buffer): self.enabled = True self._buffer = buffer self._impl = persistence_impl - self.metadata_key = self._impl.build_key( - runstate.MCP_STATE, - StateMetadata.name, - ) + # TODO: Make sure nobody is calling this with skip_validation passed in, then remove it def restore(self, job_names, skip_validation=False): """Return the most recent serialized state.""" log.debug("Restoring state.") - if not skip_validation: - self._restore_metadata() # First, restore the jobs themselves jobs = self._restore_dicts(runstate.JOB_STATE, job_names) # jobs should be a dictionary that contains job name and number of runs # {'MASTER.k8s': {'run_nums':[0], 'enabled': True}, 'MASTER.cits_test_frequent_1': {'run_nums': [1,0], 'enabled': True}} - # second, restore the runs for each of the jobs restored above + # Second, restore the runs for each of the jobs restored above with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # start the threads and mark each future with it's job name # this is useful so that we can index the job name later to add the runs to the jobs dictionary @@ -182,7 +142,9 @@ def _restore_runs_for_job(self, job_name, job_state): runs = copy.copy(job_runs_restored_states) for run_id, state in runs.items(): if state == {}: - log.error(f"Failed to restore {run_id}, no state found for it!") + log.error( + f"Failed to restore {run_id}, no state found for it!" + ) # KKASP: job name exists, but no state? job_runs_restored_states.pop(run_id) runs = list(job_runs_restored_states.values()) @@ -190,19 +152,15 @@ def _restore_runs_for_job(self, job_name, job_state): runs.sort(key=lambda x: x["run_num"], reverse=True) return runs - def _restore_metadata(self): - metadata = self._impl.restore([self.metadata_key]) - StateMetadata.validate_metadata(metadata.get(self.metadata_key)) - def _keys_for_items(self, item_type, names): """Returns a dict of item to the key for that item.""" keys = (self._impl.build_key(item_type, name) for name in names) return dict(zip(keys, names)) - def _restore_dicts(self, item_type, items) -> Dict[str, dict]: + def _restore_dicts(self, item_type: str, items: List[str]) -> Dict[str, dict]: """Return a dict mapping of the items name to its state data.""" key_to_item_map = self._keys_for_items(item_type, items) - key_to_state_map = self._impl.restore(key_to_item_map.keys()) + key_to_state_map = self._impl.restore(key_to_item_map.keys()) # KKASP: dynamodb_state_store.py return {key_to_item_map[key]: state_data for key, state_data in key_to_state_map.items()} def delete(self, type_enum, name): @@ -323,9 +281,6 @@ def delete_job_run(self, job_run): def save_frameworks(self, clusters): self._save_object(runstate.MESOS_STATE, clusters) - def save_metadata(self): - self._save_object(runstate.MCP_STATE, StateMetadata()) - def _save_object(self, state_type, obj): self.state_manager.save(state_type, obj.name, obj.state_data) diff --git a/tron/serialize/runstate/yamlstore.py b/tron/serialize/runstate/yamlstore.py index af770b908..4c8d760c8 100644 --- a/tron/serialize/runstate/yamlstore.py +++ b/tron/serialize/runstate/yamlstore.py @@ -15,7 +15,6 @@ TYPE_MAPPING = { runstate.JOB_STATE: "jobs", - runstate.MCP_STATE: runstate.MCP_STATE, } diff --git a/tron/utils/collections.py b/tron/utils/collections.py index 1df5f3f82..8b367a429 100644 --- a/tron/utils/collections.py +++ b/tron/utils/collections.py @@ -4,6 +4,7 @@ log = logging.getLogger(__name__) +# TODO: wtf is this, even? class MappingCollection(dict): """Dictionary like object for managing collections of items. Item is expected to support the following interface, and should be hashable. diff --git a/tron/utils/crontab.py b/tron/utils/crontab.py index 953bf9748..9a9c9c9cc 100644 --- a/tron/utils/crontab.py +++ b/tron/utils/crontab.py @@ -2,6 +2,11 @@ import calendar import itertools import re +from typing import List +from typing import Optional +from typing import Set +from typing import Tuple +from typing import Union PREDEFINED_SCHEDULE = { "@yearly": "0 0 1 1 *", @@ -14,7 +19,7 @@ } -def convert_predefined(line): +def convert_predefined(line: str) -> str: if not line.startswith("@"): return line @@ -26,8 +31,8 @@ def convert_predefined(line): class FieldParser: """Parse and validate a field in a crontab entry.""" - name = None - bounds = None + name: str = "" + bounds: Tuple[int, int] = (0, 0) range_pattern = re.compile( r""" (?P\d+|\*) # Initial value @@ -37,34 +42,38 @@ class FieldParser: re.VERBOSE, ) - def normalize(self, source): + def normalize(self, source: str) -> str: return source.strip() - def get_groups(self, source): + def get_groups(self, source: str) -> List[str]: return source.split(",") - def parse(self, source): + def parse(self, source: str) -> Optional[Union[List[int], List[str]]]: if source == "*": return None - groups = [self.get_values(group) for group in self.get_groups(source)] - groups = set(itertools.chain.from_iterable(groups)) - has_last = False - if "LAST" in groups: - has_last = True + groups: Set[Union[int, str]] = set( + itertools.chain.from_iterable(self.get_values(group) for group in self.get_groups(source)) + ) + has_last = "LAST" in groups + if has_last: groups.remove("LAST") - groups = sorted(groups) + sorted_groups = sorted(groups, key=lambda x: (isinstance(x, str), x)) if has_last: - groups.append("LAST") - return groups + sorted_groups.append("LAST") + if not sorted_groups: + return None + if all(isinstance(x, int) for x in sorted_groups): + return sorted_groups # type: ignore + return sorted_groups # type: ignore - def get_match_groups(self, source): + def get_match_groups(self, source: str) -> dict: match = self.range_pattern.match(source) if not match: raise ValueError("Unknown expression: %s" % source) return match.groupdict() - def get_values(self, source): + def get_values(self, source: str) -> List[Union[int, str]]: source = self.normalize(source) match_groups = self.get_match_groups(source) step = 1 @@ -74,7 +83,7 @@ def get_values(self, source): step = self.validate_bounds(match_groups["step"]) return self.get_range(min_value, max_value, step) - def get_value_range(self, match_groups): + def get_value_range(self, match_groups: dict) -> Tuple[int, int]: if match_groups["min"] == "*": return self.bounds @@ -86,7 +95,7 @@ def get_value_range(self, match_groups): return min_value, min_value + 1 - def get_range(self, min_value, max_value, step): + def get_range(self, min_value: int, max_value: int, step: int) -> List[Union[int, str]]: if min_value < max_value: return list(range(min_value, max_value, step)) @@ -94,12 +103,12 @@ def get_range(self, min_value, max_value, step): diff = (max_bound - min_value) + (max_value - min_bound) return [(min_value + i) % max_bound for i in list(range(0, diff, step))] - def validate_bounds(self, value): + def validate_bounds(self, value: str) -> int: min_value, max_value = self.bounds - value = int(value) - if not min_value <= value < max_value: - raise ValueError(f"{self.name} value out of range: {value}") - return value + int_value = int(value) + if not min_value <= int_value < max_value: + raise ValueError(f"{self.name} value out of range: {int_value}") + return int_value class MinuteFieldParser(FieldParser): @@ -116,7 +125,7 @@ class MonthdayFieldParser(FieldParser): name = "monthdays" bounds = (1, 32) - def get_values(self, source): + def get_values(self, source: str) -> List[Union[int, str]]: # Handle special case for last day of month source = self.normalize(source) if source == "L": @@ -130,7 +139,7 @@ class MonthFieldParser(FieldParser): bounds = (1, 13) month_names = calendar.month_abbr[1:] - def normalize(self, month): + def normalize(self, month: str) -> str: month = super().normalize(month) month = month.lower() for month_num, month_name in enumerate(self.month_names, start=1): @@ -143,7 +152,7 @@ class WeekdayFieldParser(FieldParser): bounds = (0, 7) day_names = ["sun", "mon", "tue", "wed", "thu", "fri", "sat"] - def normalize(self, day_of_week): + def normalize(self, day_of_week: str) -> str: day_of_week = super().normalize(day_of_week) day_of_week = day_of_week.lower() for dow_num, dow_name in enumerate(self.day_names): @@ -159,7 +168,7 @@ def normalize(self, day_of_week): # TODO: support L (for dow), W, # -def parse_crontab(line): +def parse_crontab(line: str) -> dict: line = convert_predefined(line) minutes, hours, dom, months, dow = line.split(None, 4) diff --git a/tron/utils/persistable.py b/tron/utils/persistable.py new file mode 100644 index 000000000..04c7437cc --- /dev/null +++ b/tron/utils/persistable.py @@ -0,0 +1,11 @@ +from abc import ABC +from abc import abstractmethod +from typing import Any +from typing import Dict + + +class Persistable(ABC): + @staticmethod + @abstractmethod + def to_json(state_data: Dict[Any, Any]) -> str: + pass From c0817727da9cdfaf36be1f999dfd137d56da3d32 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Tue, 17 Sep 2024 09:22:19 -0700 Subject: [PATCH 02/22] Remove old comments. Delete mcp state metadata. --- tests/serialize/runstate/statemanager_test.py | 18 ---------- tron/core/actionrun.py | 35 ++----------------- tron/core/job.py | 5 +-- tron/core/job_collection.py | 1 - tron/core/jobgraph.py | 4 --- tron/core/jobrun.py | 6 ++-- tron/mcp.py | 1 - 7 files changed, 5 insertions(+), 65 deletions(-) diff --git a/tests/serialize/runstate/statemanager_test.py b/tests/serialize/runstate/statemanager_test.py index a12a9168a..7010b3bff 100644 --- a/tests/serialize/runstate/statemanager_test.py +++ b/tests/serialize/runstate/statemanager_test.py @@ -40,24 +40,6 @@ def test_from_config_shelve(self): shutil.rmtree(tmpdir) -# class TestStateMetadata(TestCase): -# def test_validate_metadata(self): -# metadata = {"version": (0, 5, 2)} -# StateMetadata.validate_metadata(metadata) - -# def test_validate_metadata_no_state_data(self): -# metadata = None -# StateMetadata.validate_metadata(metadata) - -# def test_validate_metadata_mismatch(self): -# metadata = {"version": (200, 1, 1)} -# assert_raises( -# VersionMismatchError, -# StateMetadata.validate_metadata, -# metadata, -# ) - - class TestStateSaveBuffer(TestCase): @setup def setup_buffer(self): diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index dc0616672..05e3ab6b2 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -170,14 +170,6 @@ def state_data(self): state_data[field.name] = getattr(self, field.name) return state_data - # "command_config": {} - # "end_time": "2024-01-26T12:25:42.813846", - # "exit_status": 0, - # "kubernetes_task_id": "compute-infra-test-service.test--load--foo19.7850.foo.ia3xrm", - # "mesos_task_id": null, - # "rendered_command": "date; sleep 300; date", - # "start_time": "2024-01-26T12:20:04.141850" - @staticmethod def to_json(state_data: dict) -> str: """Serialize the ActionRunAttempt instance to a JSON string.""" @@ -307,8 +299,8 @@ class ActionRun(Observable, Persistable): # TODO: create a class for ActionRunId, JobRunId, Etc def __init__( self, - job_run_id: str, # TODO: maybe string??? When would these NOT be string? - name: str, # TODO: maybe string??? When would these NOT be string? + job_run_id: str, # TODO: maybe string??? When would these NOT be string? Py2 -- tag for removal in unahack or something + name: str, # TODO: maybe string??? When would these NOT be string? Py2 -- tag for removal in unahack or something node: node.Node, command_config: action.ActionCommandConfig, parent_context: Optional[CommandContext] = None, @@ -733,29 +725,6 @@ def state_data(self): # TODO: all these state_data funcs return dict @staticmethod def to_json(state_data: dict) -> str: """Serialize the ActionRun instance to a JSON string.""" - # { - # "action_name": "foo", - # "action_runner": { - # "exec_path": "/opt/venvs/tron/bin", - # "status_path": "/tmp/tron" - # }, - # "attempts": [], - # "end_time": "2024-01-26T12:25:42.813846", - # "executor": "kubernetes", - # "exit_status": 0, - # "job_run_id": "compute-infra-test-service.test_load_foo19.7850", - # "node_name": "paasta", - # "on_upstream_rerun": null, - # "original_command": "date; sleep 300; date", - # "retries_delay": null, - # "retries_remaining": null, - # "start_time": "2024-01-26T12:20:04.141850", - # "state": "succeeded", - # "trigger_downstreams": null, - # "trigger_timeout_timestamp": 1706386800.0, - # "triggered_by": null - # } - action_runner = state_data.get("action_runner") if action_runner is None: action_runner_json = NoActionRunnerFactory.to_json() diff --git a/tron/core/job.py b/tron/core/job.py index 87753f0a9..341298823 100644 --- a/tron/core/job.py +++ b/tron/core/job.py @@ -77,7 +77,7 @@ class Job(Observable, Observer, Persistable): def __init__( self, - name: str, # TODO TYPE for self.name: QUESTION: might be bytes instead of string??? + name: str, # TODO TYPE for self.name: QUESTION: might be bytes instead of string??? Py2 -- tag for removal in unahack or something scheduler: GeneralScheduler, queueing: bool = True, all_nodes: bool = False, @@ -120,9 +120,6 @@ def __init__( self.run_limit = run_limit log.info(f"{self} created") - log.info(f"TYPE for max_runtime: {type(max_runtime)}") - log.info(f"TYPE for time_zone: {type(time_zone)}") - @staticmethod def to_json(state_data: dict) -> str: """Serialize the Job instance to a JSON string.""" diff --git a/tron/core/job_collection.py b/tron/core/job_collection.py index 1921b235e..1d9e1e7d2 100644 --- a/tron/core/job_collection.py +++ b/tron/core/job_collection.py @@ -7,7 +7,6 @@ log = logging.getLogger(__name__) -# QUESTION: is JobCollection a combo of Job and JobRun? class JobCollection: """A collection of jobs.""" diff --git a/tron/core/jobgraph.py b/tron/core/jobgraph.py index c690b5311..ede26f3b5 100644 --- a/tron/core/jobgraph.py +++ b/tron/core/jobgraph.py @@ -1,4 +1,3 @@ -import logging from collections import defaultdict from collections import namedtuple from typing import Optional @@ -8,9 +7,6 @@ from tron.core.actiongraph import ActionGraph from tron.utils import maybe_decode -log = logging.getLogger(__name__) - - AdjListEntry = namedtuple("AdjListEntry", ["action_name", "is_trigger"]) diff --git a/tron/core/jobrun.py b/tron/core/jobrun.py index 54da6f7b3..190735722 100644 --- a/tron/core/jobrun.py +++ b/tron/core/jobrun.py @@ -86,10 +86,8 @@ def to_json(state_data: dict) -> str: "run_num": state_data["run_num"], "run_time": state_data["run_time"].isoformat() if state_data["run_time"] else None, "node_name": state_data["node_name"], - "runs": [ActionRun.to_json(run) for run in state_data["runs"]], # QUESTION: ActionRun vs. Action - "cleanup_run": ActionRun.to_json(state_data["cleanup_run"]) - if state_data["cleanup_run"] - else None, # QUESTION: ActionRun vs. Action + "runs": [ActionRun.to_json(run) for run in state_data["runs"]], + "cleanup_run": ActionRun.to_json(state_data["cleanup_run"]) if state_data["cleanup_run"] else None, "manual": state_data["manual"], } ) diff --git a/tron/mcp.py b/tron/mcp.py index 9cc5152d6..6ca1a4bda 100644 --- a/tron/mcp.py +++ b/tron/mcp.py @@ -199,7 +199,6 @@ def restore_state(self, action_runner): log.info( f"Tron completed restoring state for the jobs. Time elapsed since Tron started {time.time() - self.boot_time}" ) - # self.state_watcher.save_metadata() def __str__(self): return "MCP" From 88de4184a6ac0b02a68f1c96d2a15e3ebd659942 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Tue, 17 Sep 2024 09:56:48 -0700 Subject: [PATCH 03/22] Return these --- Makefile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Makefile b/Makefile index ddfe573c9..af7b1844f 100644 --- a/Makefile +++ b/Makefile @@ -28,6 +28,9 @@ endif -usage: @echo "make test - Run tests" + @echo "make deb_bionic - Generate bionic deb package" + @echo "make itest_bionic - Run tests and integration checks" + @echo "make _itest_bionic - Run only integration checks" @echo "make deb_jammy - Generate jammy deb package" @echo "make itest_jammy - Run tests and integration checks" @echo "make _itest_jammy - Run only integration checks" From 909e9b2a291a8a30a7d7d0763dcba46ada55f617 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Tue, 17 Sep 2024 12:38:59 -0700 Subject: [PATCH 04/22] Try itest validation without mcp_state --- itest.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/itest.sh b/itest.sh index 7d315c5dd..a4520405b 100755 --- a/itest.sh +++ b/itest.sh @@ -61,12 +61,11 @@ fi kill -SIGTERM $TRON_PID wait $TRON_PID || true -# TODO: Remove this staetmetadata crap as it'll likely fail /opt/venvs/tron/bin/python - < start time {}".format(ts, int(os.environ['TRON_START_TIME']))) From 1a5238bec50e6cede788c7999ce9df7b2f242569 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Tue, 17 Sep 2024 13:53:17 -0700 Subject: [PATCH 05/22] Fix isinstance check for Action to_json --- requirements-dev-minimal.txt | 1 + requirements-dev.txt | 1 + tron/core/action.py | 3 +-- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/requirements-dev-minimal.txt b/requirements-dev-minimal.txt index eddbadb2b..6df62cd55 100644 --- a/requirements-dev-minimal.txt +++ b/requirements-dev-minimal.txt @@ -8,6 +8,7 @@ pylint pytest pytest-asyncio requirements-tools +types-pytz types-PyYAML types-requests<2.31.0.7 # newer types-requests requires urllib3>=2 types-simplejson diff --git a/requirements-dev.txt b/requirements-dev.txt index 2860357cd..ee1dd3374 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -29,6 +29,7 @@ requirements-tools==1.2.1 toml==0.10.2 tomli==2.0.1 tomlkit==0.11.6 +types-pytz==2024.2.0.20240913 types-PyYAML==6.0.12 types-requests==2.31.0.5 types-simplejson==3.19.0.20240310 diff --git a/tron/core/action.py b/tron/core/action.py index ba7b67ec8..b531911fd 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -1,7 +1,6 @@ import datetime import json import logging -from collections import namedtuple from dataclasses import dataclass from dataclasses import field from dataclasses import fields @@ -60,7 +59,7 @@ def to_json(state_data: dict) -> str: """Serialize the ActionCommandConfig instance to a JSON string.""" def serialize_namedtuple(obj): - if isinstance(obj, namedtuple): + if isinstance(obj, tuple) and hasattr(obj, "_fields"): return obj._asdict() return obj From c18cce4ce42e4123203bc18709aaacf08b41d850 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Wed, 18 Sep 2024 07:12:37 -0700 Subject: [PATCH 06/22] Comment cleanup. Add cleanup TODOs for TRON-2293. Rename val in dynamodb_state_store to something a little more explanatory now that we have 2 versions --- bin/tronfig | 1 - tron/actioncommand.py | 1 - tron/api/resource.py | 10 +++++--- tron/commands/display.py | 4 +++- tron/config/manager.py | 8 +++++-- tron/core/actionrun.py | 21 +++++++++++----- tron/core/job.py | 7 +++--- tron/core/job_collection.py | 8 +------ tron/core/job_scheduler.py | 1 - tron/core/jobgraph.py | 4 +++- tron/core/jobrun.py | 4 +++- tron/mcp.py | 11 ++------- tron/serialize/filehandler.py | 4 +++- .../runstate/dynamodb_state_store.py | 24 ++++++++++--------- tron/serialize/runstate/shelvestore.py | 8 +++++-- tron/serialize/runstate/statemanager.py | 6 ++--- tron/utils/__init__.py | 2 ++ 17 files changed, 70 insertions(+), 54 deletions(-) diff --git a/bin/tronfig b/bin/tronfig index 057b0ed3b..5f50a83e7 100755 --- a/bin/tronfig +++ b/bin/tronfig @@ -199,7 +199,6 @@ if __name__ == "__main__": client = Client(args.server) if args.print_config: - # TODO: use maybe_encode() content = client.config(args.source)["config"] if type(content) is not bytes: content = content.encode("utf8") diff --git a/tron/actioncommand.py b/tron/actioncommand.py index 91850c74e..866f376d4 100644 --- a/tron/actioncommand.py +++ b/tron/actioncommand.py @@ -146,7 +146,6 @@ def clear(self): self.buffers.clear() -# QUESTION: What is the use case? class NoActionRunnerFactory(Persistable): """Action runner factory that does not wrap the action run command.""" diff --git a/tron/api/resource.py b/tron/api/resource.py index 663f4ed10..0460c910f 100644 --- a/tron/api/resource.py +++ b/tron/api/resource.py @@ -183,7 +183,9 @@ def getChild(self, action_name, _): if not action_name: return self - action_name = maybe_decode(action_name) + action_name = maybe_decode( + action_name + ) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. if action_name in self.job_run.action_runs: action_run = self.job_run.action_runs[action_name] return ActionRunResource(action_run, self.job_run) @@ -231,7 +233,9 @@ def getChild(self, run_id, _): if not run_id: return self - run_id = maybe_decode(run_id) + run_id = maybe_decode( + run_id + ) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. run = self.get_run_from_identifier(run_id) if run: return JobRunResource(run, self.job_scheduler) @@ -297,7 +301,7 @@ def getChild(self, name, request): if not name: return self - name = maybe_decode(name) + name = maybe_decode(name) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. return resource_from_collection(self.job_collection, name, JobResource) def get_data( diff --git a/tron/commands/display.py b/tron/commands/display.py index c64813260..2ab076327 100644 --- a/tron/commands/display.py +++ b/tron/commands/display.py @@ -443,6 +443,8 @@ def view_with_less(content, color=True): cmd.append("-r") less_proc = subprocess.Popen(cmd, stdin=subprocess.PIPE) - less_proc.stdin.write(maybe_encode(content)) + less_proc.stdin.write( + maybe_encode(content) + ) # TODO: TRON-2293 maybe_encode is a relic of Python2->Python3 migration. Remove it. less_proc.stdin.close() less_proc.wait() diff --git a/tron/config/manager.py b/tron/config/manager.py index 21b6e19ca..7717f039a 100644 --- a/tron/config/manager.py +++ b/tron/config/manager.py @@ -32,7 +32,9 @@ def read(path): def write_raw(path, content): with open(path, "w") as fh: - fh.write(maybe_decode(content)) + fh.write( + maybe_decode(content) + ) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. def read_raw(path) -> str: @@ -41,7 +43,9 @@ def read_raw(path) -> str: def hash_digest(content): - return hashlib.sha1(maybe_encode(content)).hexdigest() + return hashlib.sha1( + maybe_encode(content) + ).hexdigest() # TODO: TRON-2293 maybe_encode is a relic of Python2->Python3 migration. Remove it. class ManifestFile: diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 05e3ab6b2..8db9f9a8b 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -58,7 +58,9 @@ class ActionRunFactory: def build_action_run_collection(cls, job_run, action_runner): """Create an ActionRunCollection from an ActionGraph and JobRun.""" action_run_map = { - maybe_decode(name): cls.build_run_for_action( + maybe_decode( + name + ): cls.build_run_for_action( # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. job_run, action_inst, action_runner, @@ -84,6 +86,7 @@ def action_run_collection_from_state( ), ) + # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. action_run_map = {maybe_decode(action_run.action_name): action_run for action_run in action_runs} return ActionRunCollection(job_run.action_graph, action_run_map) @@ -299,8 +302,8 @@ class ActionRun(Observable, Persistable): # TODO: create a class for ActionRunId, JobRunId, Etc def __init__( self, - job_run_id: str, # TODO: maybe string??? When would these NOT be string? Py2 -- tag for removal in unahack or something - name: str, # TODO: maybe string??? When would these NOT be string? Py2 -- tag for removal in unahack or something + job_run_id: str, + name: str, node: node.Node, command_config: action.ActionCommandConfig, parent_context: Optional[CommandContext] = None, @@ -323,8 +326,12 @@ def __init__( original_command: Optional[str] = None, ): super().__init__() - self.job_run_id = maybe_decode(job_run_id) - self.action_name = maybe_decode(name) + self.job_run_id = maybe_decode( + job_run_id + ) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. + self.action_name = maybe_decode( + name + ) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. self.node = node self.start_time = start_time self.end_time = end_time @@ -398,7 +405,9 @@ def attempts_from_state(cls, state_data, command_config): if "attempts" in state_data: attempts = [ActionRunAttempt.from_state(a) for a in state_data["attempts"]] else: - rendered_command = maybe_decode(state_data.get("rendered_command")) + rendered_command = maybe_decode( + state_data.get("rendered_command") + ) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. exit_statuses = state_data.get("exit_statuses", []) # If the action has started, add an attempt for the final try if state_data.get("start_time"): diff --git a/tron/core/job.py b/tron/core/job.py index 341298823..99b1a0dcb 100644 --- a/tron/core/job.py +++ b/tron/core/job.py @@ -77,7 +77,7 @@ class Job(Observable, Observer, Persistable): def __init__( self, - name: str, # TODO TYPE for self.name: QUESTION: might be bytes instead of string??? Py2 -- tag for removal in unahack or something + name: str, scheduler: GeneralScheduler, queueing: bool = True, all_nodes: bool = False, @@ -96,7 +96,9 @@ def __init__( run_limit: Optional[int] = None, ): super().__init__() - self.name = maybe_decode(name) + self.name = maybe_decode( + name + ) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. self.monitoring = monitoring self.action_graph = action_graph self.scheduler = scheduler @@ -222,7 +224,6 @@ def state_data(self): "enabled": self.enabled, } - # KKASP: called from job_scheduler.py during second restore workflow def get_job_runs_from_state(self, state_data): """Apply a previous state to this Job.""" self.enabled = state_data["enabled"] diff --git a/tron/core/job_collection.py b/tron/core/job_collection.py index 1d9e1e7d2..22c9ad0e8 100644 --- a/tron/core/job_collection.py +++ b/tron/core/job_collection.py @@ -77,13 +77,7 @@ def restore_state(self, job_state_data, config_action_runner): runs for each job """ for name, state in job_state_data.items(): - log.info(f"kkasp logging bonanza - job_collection name: {name}") - # name: compute-infra-test-service.test_load_foo1 - log.info(f"kkasp logging bonanza - job_collection state: {state}") - # state: {'run_nums': [4, 3, 2, 1, 0], 'enabled': True, runs: [{}, {}]} - self.jobs[name].restore_state( - state, config_action_runner - ) # KKASP: call restore_state on JobScheduler, pass in jobrun state_data + self.jobs[name].restore_state(state, config_action_runner) log.info(f"Loaded state for {len(job_state_data)} jobs") def get_by_name(self, name): diff --git a/tron/core/job_scheduler.py b/tron/core/job_scheduler.py index e796bd94c..e4e134fcb 100644 --- a/tron/core/job_scheduler.py +++ b/tron/core/job_scheduler.py @@ -24,7 +24,6 @@ def __init__(self, job: Job): self.job = job self.watch(job) - # KKASP: Called from job_collection during second restore workflow def restore_state(self, job_state_data, config_action_runner): """Load the job state and schedule any JobRuns.""" job_runs = self.job.get_job_runs_from_state(job_state_data) diff --git a/tron/core/jobgraph.py b/tron/core/jobgraph.py index ede26f3b5..82147bbd9 100644 --- a/tron/core/jobgraph.py +++ b/tron/core/jobgraph.py @@ -95,7 +95,9 @@ def get_action_graph_for_job(self, job_name): return ActionGraph(job_action_map, required_actions, required_triggers) def _save_action(self, action_name, job_name, config): - action_name = maybe_decode(action_name) + action_name = maybe_decode( + action_name + ) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. full_name = f"{job_name}.{action_name}" self.action_map[full_name] = Action.from_config(config) self._actions_for_job[job_name].append(full_name) diff --git a/tron/core/jobrun.py b/tron/core/jobrun.py index 190735722..c3294853b 100644 --- a/tron/core/jobrun.py +++ b/tron/core/jobrun.py @@ -61,7 +61,9 @@ def __init__( manual: Optional[bool] = None, # TODO: what are you for? ): super().__init__() - self.job_name = maybe_decode(job_name) + self.job_name = maybe_decode( + job_name + ) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. self.run_num = run_num self.run_time = run_time self.node = node diff --git a/tron/mcp.py b/tron/mcp.py index 6ca1a4bda..cf42f52e5 100644 --- a/tron/mcp.py +++ b/tron/mcp.py @@ -85,7 +85,6 @@ def initial_setup(self): # The job schedule factories will be created in the function below self._load_config() # Jobs will also get scheduled (internally) once the state for action runs are restored in restore_state - # KKASP: Restore really kicks off here with timer("self.restore_state"): self.restore_state( actioncommand.create_action_runner_factory_from_config( @@ -183,19 +182,13 @@ def restore_state(self, action_runner): log.info("Restoring from DynamoDB") with timer("restore"): # restores the state of the jobs and their runs from DynamoDB - # KKASP: The first restore grabs all the data from DynamoDB - states = self.state_watcher.restore( # TODO: what is "states" - self.jobs.get_names() - ) # KKASP: _load_config loads the names into self.jobs - # KKASP: states is a weird hodgepodge of job_state and job_run_state + states = self.state_watcher.restore(self.jobs.get_names()) log.info( f"Tron will start restoring state for the jobs and will start scheduling them! Time elapsed since Tron started {time.time() - self.boot_time}" ) # loads the runs' state and schedule the next run for each job with timer("self.jobs.restore_state"): - self.jobs.restore_state( - states.get("job_state", {}), action_runner - ) # QUESTION: why is job_state here holding both job and job run state? + self.jobs.restore_state(states.get("job_state", {}), action_runner) log.info( f"Tron completed restoring state for the jobs. Time elapsed since Tron started {time.time() - self.boot_time}" ) diff --git a/tron/serialize/filehandler.py b/tron/serialize/filehandler.py index 6c3dd8190..f7941ba4f 100644 --- a/tron/serialize/filehandler.py +++ b/tron/serialize/filehandler.py @@ -69,7 +69,9 @@ def write(self, content): return self.last_accessed = time.time() - self._fh.write(maybe_encode(content)) + self._fh.write( + maybe_encode(content) + ) # TODO: TRON-2293 maybe_encode is a relic of Python2->Python3 migration. Remove it. self.manager.update(self) def __enter__(self): diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 733c4bd5f..65474997f 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -70,7 +70,7 @@ def restore(self, keys) -> dict: """ first_items = self._get_first_partitions(keys) remaining_items = self._get_remaining_partitions(first_items) - vals = self._merge_items(first_items, remaining_items) # KKASP: pickle loading happens in here + vals = self._merge_items(first_items, remaining_items) return vals def chunk_keys(self, keys: Sequence[T]) -> List[Sequence[T]]: @@ -230,18 +230,18 @@ def _save_loop(self): log.error("too many dynamodb errors in a row, crashing") os.exit(1) - # json_val is the non-pickled version of val - def __setitem__(self, key: str, val: bytes, json_val: str) -> None: + def __setitem__(self, key: str, pickled_val: bytes, json_val: str) -> None: """ Partition the item and write up to 10 partitions atomically. Retry up to 3 times on failure - Examine the size of `val`, and splice it into - different parts under 400KB with different sort keys, - and save them under the same partition key built. + Examine the size of `pickled_val` and `json_val`, and + splice them into different parts based on `OBJECT_SIZE` + with different sort keys, and save them under the same + partition key built. """ start = time.time() - num_partitions = math.ceil(len(val) / OBJECT_SIZE) + num_partitions = math.ceil(len(pickled_val) / OBJECT_SIZE) num_json_val_partitions = math.ceil(len(json_val) / OBJECT_SIZE) log.debug( @@ -264,15 +264,17 @@ def __setitem__(self, key: str, val: bytes, json_val: str) -> None: "N": str(index), }, "val": { - "B": val[index * OBJECT_SIZE : min(index * OBJECT_SIZE + OBJECT_SIZE, len(val))], + "B": pickled_val[ + index * OBJECT_SIZE : min(index * OBJECT_SIZE + OBJECT_SIZE, len(pickled_val)) + ], }, "num_partitions": { "N": str(num_partitions), }, - "raw_val": { + "json_val": { "S": json_val[index * OBJECT_SIZE : min(index * OBJECT_SIZE + OBJECT_SIZE, len(json_val))] }, - "num_raw_val_partitions": { + "num_json_val_partitions": { "N": str(num_json_val_partitions), }, }, @@ -304,7 +306,7 @@ def __setitem__(self, key: str, val: bytes, json_val: str) -> None: delta=time.time() - start, ) - # TODO: Is this ok with we use the max number of partitions? + # TODO: Is this ok if we just use the max number of partitions? def _delete_item(self, key: str) -> None: start = time.time() try: diff --git a/tron/serialize/runstate/shelvestore.py b/tron/serialize/runstate/shelvestore.py index 22d1bf8ac..6000d946c 100644 --- a/tron/serialize/runstate/shelvestore.py +++ b/tron/serialize/runstate/shelvestore.py @@ -52,8 +52,12 @@ class ShelveKey: __slots__ = ["type", "iden"] def __init__(self, type, iden): - self.type = maybe_decode(type) - self.iden = maybe_decode(iden) + self.type = maybe_decode( + type + ) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. + self.iden = maybe_decode( + iden + ) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. @property def key(self): diff --git a/tron/serialize/runstate/statemanager.py b/tron/serialize/runstate/statemanager.py index 0ecf69db8..94f3d2512 100644 --- a/tron/serialize/runstate/statemanager.py +++ b/tron/serialize/runstate/statemanager.py @@ -142,9 +142,7 @@ def _restore_runs_for_job(self, job_name, job_state): runs = copy.copy(job_runs_restored_states) for run_id, state in runs.items(): if state == {}: - log.error( - f"Failed to restore {run_id}, no state found for it!" - ) # KKASP: job name exists, but no state? + log.error(f"Failed to restore {run_id}, no state found for it!") job_runs_restored_states.pop(run_id) runs = list(job_runs_restored_states.values()) @@ -160,7 +158,7 @@ def _keys_for_items(self, item_type, names): def _restore_dicts(self, item_type: str, items: List[str]) -> Dict[str, dict]: """Return a dict mapping of the items name to its state data.""" key_to_item_map = self._keys_for_items(item_type, items) - key_to_state_map = self._impl.restore(key_to_item_map.keys()) # KKASP: dynamodb_state_store.py + key_to_state_map = self._impl.restore(key_to_item_map.keys()) return {key_to_item_map[key]: state_data for key, state_data in key_to_state_map.items()} def delete(self, type_enum, name): diff --git a/tron/utils/__init__.py b/tron/utils/__init__.py index f39abcc13..45225d948 100644 --- a/tron/utils/__init__.py +++ b/tron/utils/__init__.py @@ -7,12 +7,14 @@ log = logging.getLogger(__name__) +# TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. def maybe_decode(maybe_string): if type(maybe_string) is bytes: return maybe_string.decode() return maybe_string +# TODO: TRON-2293 maybe_encode is a relic of Python2->Python3 migration. Remove it. def maybe_encode(maybe_bytes): if type(maybe_bytes) is not bytes: return maybe_bytes.encode() From 7fb271d75585363a591bf7ba0e72da47d56bf84e Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Wed, 18 Sep 2024 08:49:35 -0700 Subject: [PATCH 07/22] Remove shelve key test since we no longer save mcp_state or state_metadata --- itest.sh | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/itest.sh b/itest.sh index a4520405b..3c913b893 100755 --- a/itest.sh +++ b/itest.sh @@ -61,13 +61,13 @@ fi kill -SIGTERM $TRON_PID wait $TRON_PID || true -/opt/venvs/tron/bin/python - < start time {}".format(ts, int(os.environ['TRON_START_TIME']))) -assert ts > int(os.environ['TRON_START_TIME']) -EOF +# /opt/venvs/tron/bin/python - < start time {}".format(ts, int(os.environ['TRON_START_TIME']))) +# assert ts > int(os.environ['TRON_START_TIME']) +# EOF From 677d6e3e35d1ecbf23dd61b55f0aed9c53921f5a Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Wed, 18 Sep 2024 09:05:56 -0700 Subject: [PATCH 08/22] Remove deprecated itest. Rename some variables to make things a bit clearer. --- itest.sh | 11 ----------- .../runstate/dynamodb_state_store_test.py | 6 +++--- tron/serialize/runstate/dynamodb_state_store.py | 14 +++++++------- tron/serialize/runstate/shelvestore.py | 1 + 4 files changed, 11 insertions(+), 21 deletions(-) diff --git a/itest.sh b/itest.sh index 3c913b893..815c6b3f3 100755 --- a/itest.sh +++ b/itest.sh @@ -60,14 +60,3 @@ fi kill -SIGTERM $TRON_PID wait $TRON_PID || true - -# /opt/venvs/tron/bin/python - < start time {}".format(ts, int(os.environ['TRON_START_TIME']))) -# assert ts > int(os.environ['TRON_START_TIME']) -# EOF diff --git a/tests/serialize/runstate/dynamodb_state_store_test.py b/tests/serialize/runstate/dynamodb_state_store_test.py index 88553a4d2..5a54e4cfe 100644 --- a/tests/serialize/runstate/dynamodb_state_store_test.py +++ b/tests/serialize/runstate/dynamodb_state_store_test.py @@ -121,7 +121,7 @@ def large_object(): "runs": [], "cleanup_run": None, "manual": False, - "large_data": [i for i in range(10_000)], # TODO: is this enough to get partitioned? + "large_data": [i for i in range(1_000_000)], } @@ -154,8 +154,8 @@ def test_save(self, store, small_object, large_object): for key in keys: item = store.table.get_item(Key={"key": key, "index": 0}) assert "Item" in item - assert "raw_val" in item["Item"] - assert_equal(json.loads(item["Item"]["raw_val"]), small_object) + assert "json_val" in item["Item"] + assert_equal(json.loads(item["Item"]["json_val"]), small_object) def test_delete_if_val_is_none(self, store, small_object, large_object): key_value_pairs = [ diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 65474997f..6c6006097 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -167,10 +167,10 @@ def save(self, key_value_pairs) -> None: self.save_queue[key] = (val, None) else: state_type = self.get_type_from_key(key) - serialized_val = self._serialize_item(state_type, val) + json_val = self._serialize_item(state_type, val) self.save_queue[key] = ( val, - serialized_val, + json_val, ) break @@ -181,20 +181,20 @@ def _consume_save_queue(self): for _ in range(qlen): try: with self.save_lock: - key, (original_val, serialized_val) = self.save_queue.popitem(last=False) - log.debug(f"Processing save for {key} with a value of {original_val}") + key, (pickled_val, json_val) = self.save_queue.popitem(last=False) + log.debug(f"Processing save for {key} with a value of {pickled_val}") # Remove all previous data with the same partition key # TODO: only remove excess partitions if new data has fewer self._delete_item(key) - if original_val is not None: - self.__setitem__(key, pickle.dumps(original_val), serialized_val) + if pickled_val is not None: + self.__setitem__(key, pickle.dumps(pickled_val), json_val) # reset errors count if we can successfully save saved += 1 except Exception as e: error = "tron_dynamodb_save_failure: failed to save key " f'"{key}" to dynamodb:\n{repr(e)}' log.error(error) with self.save_lock: - self.save_queue[key] = (original_val, serialized_val) + self.save_queue[key] = (pickled_val, json_val) duration = time.time() - start log.info(f"saved {saved} items in {duration}s") diff --git a/tron/serialize/runstate/shelvestore.py b/tron/serialize/runstate/shelvestore.py index 6000d946c..4e275b7f5 100644 --- a/tron/serialize/runstate/shelvestore.py +++ b/tron/serialize/runstate/shelvestore.py @@ -12,6 +12,7 @@ log = logging.getLogger(__name__) +# TODO: TRON-2293 This class does some Python 2 and Python 3 handling shenanigans. It should be cleaned up. class Py2Shelf(shelve.Shelf): def __init__(self, filename, flag="c", protocol=2, writeback=False): db = bsddb3.hashopen(filename, flag) From e262cbc1df17d70b447d655241746b586cb8979d Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Thu, 19 Sep 2024 07:13:49 -0700 Subject: [PATCH 09/22] Remove unnecessary list call --- tron/core/action.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tron/core/action.py b/tron/core/action.py index b531911fd..b68c13ff4 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -69,8 +69,8 @@ def serialize_namedtuple(obj): "cpus": state_data["cpus"], "mem": state_data["mem"], "disk": state_data["disk"], - "cap_add": list(state_data["cap_add"]), - "cap_drop": list(state_data["cap_drop"]), + "cap_add": state_data["cap_add"], + "cap_drop": state_data["cap_drop"], "constraints": list(state_data["constraints"]), "docker_image": state_data["docker_image"], "docker_parameters": list(state_data["docker_parameters"]), From 83d7c65dcf862b523ab20052b24ee19e509be9c8 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Fri, 18 Oct 2024 11:45:54 -0700 Subject: [PATCH 10/22] Replace ignore with proper types for sorted_groups --- tron/utils/crontab.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tron/utils/crontab.py b/tron/utils/crontab.py index 9a9c9c9cc..7ff021891 100644 --- a/tron/utils/crontab.py +++ b/tron/utils/crontab.py @@ -48,7 +48,7 @@ def normalize(self, source: str) -> str: def get_groups(self, source: str) -> List[str]: return source.split(",") - def parse(self, source: str) -> Optional[Union[List[int], List[str]]]: + def parse(self, source: str) -> Optional[Union[List[int], List[Union[int, str]]]]: if source == "*": return None @@ -58,14 +58,14 @@ def parse(self, source: str) -> Optional[Union[List[int], List[str]]]: has_last = "LAST" in groups if has_last: groups.remove("LAST") - sorted_groups = sorted(groups, key=lambda x: (isinstance(x, str), x)) + sorted_groups: List[Union[int, str]] = sorted(groups, key=lambda x: (isinstance(x, str), x)) if has_last: sorted_groups.append("LAST") if not sorted_groups: return None if all(isinstance(x, int) for x in sorted_groups): - return sorted_groups # type: ignore - return sorted_groups # type: ignore + return sorted_groups + return sorted_groups def get_match_groups(self, source: str) -> dict: match = self.range_pattern.match(source) From 002f9356774130bd046e100ebe7ba6f75a7844f0 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Fri, 18 Oct 2024 11:51:17 -0700 Subject: [PATCH 11/22] Remove skip_validation since removing MCP StateMetadata --- tools/migration/migrate_state.py | 1 - tron/serialize/runstate/statemanager.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/tools/migration/migrate_state.py b/tools/migration/migrate_state.py index 5a0f7c114..23e9f53f1 100644 --- a/tools/migration/migrate_state.py +++ b/tools/migration/migrate_state.py @@ -103,7 +103,6 @@ def convert_state(opts): job_states = source_manager.restore( job_names, - skip_validation=True, ) source_manager.cleanup() diff --git a/tron/serialize/runstate/statemanager.py b/tron/serialize/runstate/statemanager.py index 94f3d2512..f37b4882b 100644 --- a/tron/serialize/runstate/statemanager.py +++ b/tron/serialize/runstate/statemanager.py @@ -104,8 +104,7 @@ def __init__(self, persistence_impl, buffer): self._buffer = buffer self._impl = persistence_impl - # TODO: Make sure nobody is calling this with skip_validation passed in, then remove it - def restore(self, job_names, skip_validation=False): + def restore(self, job_names): """Return the most recent serialized state.""" log.debug("Restoring state.") From f5ab20c0d019219d5dfafc648bed5776bf257ab8 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Fri, 18 Oct 2024 11:55:25 -0700 Subject: [PATCH 12/22] Add more types and a few TODO tickets --- tron/actioncommand.py | 1 + tron/core/actionrun.py | 2 +- tron/core/jobrun.py | 9 +++++---- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/tron/actioncommand.py b/tron/actioncommand.py index 866f376d4..7a49fc1cf 100644 --- a/tron/actioncommand.py +++ b/tron/actioncommand.py @@ -146,6 +146,7 @@ def clear(self): self.buffers.clear() +# TODO: TRON-2304 - Cleanup NoActionRunnerFactory class NoActionRunnerFactory(Persistable): """Action runner factory that does not wrap the action run command.""" diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 8db9f9a8b..56a09662f 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -313,7 +313,7 @@ def __init__( end_time: Optional[datetime.datetime] = None, run_state: str = SCHEDULED, exit_status: Optional[int] = None, - attempts: Optional[list] = None, # TODO: list of...ActionCommandConfig? + attempts: Optional[List[ActionRunAttempt]] = None, action_runner: Optional[Union[NoActionRunnerFactory, SubprocessActionRunnerFactory]] = None, retries_remaining: Optional[int] = None, retries_delay: Optional[datetime.timedelta] = None, diff --git a/tron/core/jobrun.py b/tron/core/jobrun.py index c3294853b..4d3306d19 100644 --- a/tron/core/jobrun.py +++ b/tron/core/jobrun.py @@ -13,6 +13,7 @@ from tron import node from tron.core.actiongraph import ActionGraph from tron.core.actionrun import ActionRun +from tron.core.actionrun import ActionRunCollection from tron.core.actionrun import ActionRunFactory from tron.serialize import filehandler from tron.utils import maybe_decode @@ -56,14 +57,14 @@ def __init__( node: node.Node, output_path: Optional[filehandler.OutputPath] = None, base_context: Optional[command_context.CommandContext] = None, - action_runs=None, # TODO: list of action runs? + action_runs: Optional[ActionRunCollection] = None, action_graph: Optional[ActionGraph] = None, - manual: Optional[bool] = None, # TODO: what are you for? + manual: Optional[bool] = None, ): super().__init__() self.job_name = maybe_decode( job_name - ) # TODO: TRON-2293 maybe_decode is a relic of Python2->Python3 migration. Remove it. + ) # TODO: TRON-2293 - maybe_decode is a relic of Python2->Python3 migration. Remove it. self.run_num = run_num self.run_time = run_time self.node = node @@ -124,7 +125,7 @@ def for_job(cls, job, run_num, run_time, node, manual): return run @classmethod - def from_state( # TODO: types + def from_state( cls, state_data, action_graph, From 00a50a99dc208fcc030e79025144837fed281dc3 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Fri, 18 Oct 2024 11:57:01 -0700 Subject: [PATCH 13/22] Remove debug logs, add constant for transact_write_items, update transaction limit, and change setitem signature for take value tuple --- .../runstate/dynamodb_state_store.py | 48 +++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 6c6006097..4940e4346 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -12,7 +12,9 @@ from typing import DefaultDict from typing import Dict from typing import List +from typing import Literal from typing import Sequence +from typing import Tuple from typing import TypeVar import boto3 # type: ignore @@ -28,17 +30,16 @@ # TODO: Restore items -# Max DynamoDB object size is 400KB. Since we save two -# copies of the object (pickled and JSON), we need to -# consider this max size applies to the entire item, so -# we use a max size of 200KB for each version. +# Max DynamoDB object size is 400KB. Since we save two copies of the object (pickled and JSON), +# we need to consider this max size applies to the entire item, so we use a max size of 200KB +# for each version. # -# In testing I could get away with 201_000 for both -# partitions so this should be enough overhead to -# contain the object name and other data. +# In testing I could get away with 201_000 for both partitions so this should be enough overhead +# to contain the object name and other data. OBJECT_SIZE = 200_000 # TODO: config this to avoid rolling out new version when we swap back to 400_000? MAX_SAVE_QUEUE = 500 MAX_ATTEMPTS = 10 +MAX_TRANSACT_WRITE_ITEMS = 100 # Max number of items to write in a single transaction log = logging.getLogger(__name__) T = TypeVar("T") @@ -154,7 +155,6 @@ def _merge_items(self, first_items, remaining_items) -> dict: return deserialized_items def save(self, key_value_pairs) -> None: - log.debug(f"Adding to save queue: {key_value_pairs}") for key, val in key_value_pairs: while True: qlen = len(self.save_queue) @@ -181,20 +181,20 @@ def _consume_save_queue(self): for _ in range(qlen): try: with self.save_lock: - key, (pickled_val, json_val) = self.save_queue.popitem(last=False) - log.debug(f"Processing save for {key} with a value of {pickled_val}") + key, (val, json_val) = self.save_queue.popitem(last=False) # Remove all previous data with the same partition key # TODO: only remove excess partitions if new data has fewer self._delete_item(key) - if pickled_val is not None: - self.__setitem__(key, pickle.dumps(pickled_val), json_val) + if val is not None: + self[key] = (pickle.dumps(val), json_val) # reset errors count if we can successfully save saved += 1 except Exception as e: error = "tron_dynamodb_save_failure: failed to save key " f'"{key}" to dynamodb:\n{repr(e)}' log.error(error) + # Add items back to the queue if we failed to save with self.save_lock: - self.save_queue[key] = (pickled_val, json_val) + self.save_queue[key] = (val, json_val) duration = time.time() - start log.info(f"saved {saved} items in {duration}s") @@ -206,7 +206,8 @@ def _consume_save_queue(self): def get_type_from_key(self, key: str) -> str: return key.split()[0] - def _serialize_item(self, key: str, state: Dict[str, Any]) -> str: + # TODO: TRON-2305 - In an ideal world, we wouldn't be passing around state/state_data dicts. It would be a lot nicer to have regular objects here + def _serialize_item(self, key: Literal[runstate.JOB_STATE, runstate.JOB_RUN_STATE], state: Dict[str, Any]) -> str: # type: ignore if key == runstate.JOB_STATE: return Job.to_json(state) elif key == runstate.JOB_RUN_STATE: @@ -230,10 +231,10 @@ def _save_loop(self): log.error("too many dynamodb errors in a row, crashing") os.exit(1) - def __setitem__(self, key: str, pickled_val: bytes, json_val: str) -> None: + def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: """ - Partition the item and write up to 10 partitions atomically. - Retry up to 3 times on failure + Partition the item and write up to MAX_TRANSACT_WRITE_ITEMS + partitions atomically. Retry up to 3 times on failure. Examine the size of `pickled_val` and `json_val`, and splice them into different parts based on `OBJECT_SIZE` @@ -241,13 +242,10 @@ def __setitem__(self, key: str, pickled_val: bytes, json_val: str) -> None: partition key built. """ start = time.time() + + pickled_val, json_val = value num_partitions = math.ceil(len(pickled_val) / OBJECT_SIZE) num_json_val_partitions = math.ceil(len(json_val) / OBJECT_SIZE) - - log.debug( - f"Saving key: {key} with {num_partitions} pickle partitions and {num_json_val_partitions} json partitions" - ) - items = [] # Use the maximum number of partitions (JSON can be larger @@ -281,11 +279,11 @@ def __setitem__(self, key: str, pickled_val: bytes, json_val: str) -> None: "TableName": self.name, }, } + count = 0 items.append(item) - # Only up to 10 items are allowed per transactions - # TODO: transact_write_items can take up to 100 items now - while len(items) == 10 or index == max_partitions - 1: + + while len(items) == MAX_TRANSACT_WRITE_ITEMS or index == max_partitions - 1: try: self.client.transact_write_items(TransactItems=items) items = [] From 0eca03ff46457f846d709d1438109e48c6067ef4 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Fri, 18 Oct 2024 12:40:33 -0700 Subject: [PATCH 14/22] Check that rendered_command and docker_image are not None before creating task --- tron/core/actionrun.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 56a09662f..4966ab805 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -1293,6 +1293,16 @@ def recover(self) -> Optional[KubernetesTask]: last_attempt = self.attempts[-1] + if last_attempt.rendered_command is None: + log.error(f"{self} rendered_command is None, cannot recover") + self.fail(exitcode.EXIT_INVALID_COMMAND) + return None + + if last_attempt.command_config.docker_image is None: + log.error(f"{self} docker_image is None, cannot recover") + self.fail(exitcode.EXIT_KUBERNETES_TASK_INVALID) + return None + log.info(f"{self} recovering Kubernetes run") task = k8s_cluster.create_task( From d72723640d86cac5dbd5ab40c286d4d6af636729 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Fri, 18 Oct 2024 12:46:03 -0700 Subject: [PATCH 15/22] Remove comment from before my enlightenment --- tron/utils/collections.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tron/utils/collections.py b/tron/utils/collections.py index 8b367a429..1df5f3f82 100644 --- a/tron/utils/collections.py +++ b/tron/utils/collections.py @@ -4,7 +4,6 @@ log = logging.getLogger(__name__) -# TODO: wtf is this, even? class MappingCollection(dict): """Dictionary like object for managing collections of items. Item is expected to support the following interface, and should be hashable. From a02aedcf72fe64882b2870b1b99298d9caee50a1 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Wed, 23 Oct 2024 08:20:23 -0700 Subject: [PATCH 16/22] Add remaining types, add tickets to valid TODOs, and delete irrelevant TODOs --- tron/core/actionrun.py | 11 +++++----- tron/core/job.py | 10 ++++++---- tron/core/job_collection.py | 1 - tron/core/job_scheduler.py | 1 - .../runstate/dynamodb_state_store.py | 20 +++++++------------ 5 files changed, 19 insertions(+), 24 deletions(-) diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 4966ab805..b141f4b96 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -24,6 +24,7 @@ from tron.bin.action_runner import build_environment from tron.bin.action_runner import build_labels from tron.command_context import CommandContext +from tron.config import schema from tron.config.config_utils import StringFormatter from tron.config.schema import ExecutorTypes from tron.core import action @@ -317,11 +318,11 @@ def __init__( action_runner: Optional[Union[NoActionRunnerFactory, SubprocessActionRunnerFactory]] = None, retries_remaining: Optional[int] = None, retries_delay: Optional[datetime.timedelta] = None, - machine=None, # TODO str? + machine: Optional[Machine] = None, executor: Optional[str] = None, - trigger_downstreams=None, # TODO: confirm with test job - triggered_by: Optional[set] = None, # TODO: confirm with test job - on_upstream_rerun: Optional[str] = None, # TODO: confirm with test job + trigger_downstreams: Optional[Union[bool, dict]] = None, + triggered_by: Optional[List[str]] = None, + on_upstream_rerun: Optional[schema.ActionOnRerun] = None, trigger_timeout_timestamp: Optional[float] = None, original_command: Optional[str] = None, ): @@ -700,7 +701,7 @@ def cancel_delay(self): return True @property - def state_data(self): # TODO: all these state_data funcs return dict + def state_data(self): """This data is used to serialize the state of this action run.""" if isinstance(self.action_runner, NoActionRunnerFactory): diff --git a/tron/core/job.py b/tron/core/job.py index 99b1a0dcb..22bb64d66 100644 --- a/tron/core/job.py +++ b/tron/core/job.py @@ -1,11 +1,13 @@ +import datetime import json import logging -from datetime import timedelta from typing import Any from typing import Dict from typing import Optional from typing import TypeVar +import pytz + from tron import command_context from tron import node from tron.actioncommand import SubprocessActionRunnerFactory @@ -90,9 +92,9 @@ def __init__( output_path: Optional[filehandler.OutputPath] = None, allow_overlap: Optional[bool] = None, action_runner: Optional[SubprocessActionRunnerFactory] = None, - max_runtime: Optional[Any] = None, # TODO TYPE for self.max_runtime: - time_zone: Optional[Any] = None, # TODO TYPE for self.time_zone: - expected_runtime: Optional[timedelta] = None, + max_runtime: Optional[datetime.timedelta] = None, + time_zone: Optional[pytz.timezone] = None, + expected_runtime: Optional[datetime.timedelta] = None, run_limit: Optional[int] = None, ): super().__init__() diff --git a/tron/core/job_collection.py b/tron/core/job_collection.py index 22c9ad0e8..dbc3fc8d5 100644 --- a/tron/core/job_collection.py +++ b/tron/core/job_collection.py @@ -69,7 +69,6 @@ def update(self, new_job_scheduler): job_scheduler.schedule_reconfigured() return True - # TODO: Types?, job_state_data is a weird dict[str, dict[Any?, Any?]], I think config_action_runner is actioncommand.SubprocessActionRunnerFactory OR actioncommand.NoActionRunnerFactory def restore_state(self, job_state_data, config_action_runner): """ Loops through the jobs and their runs in order to load their diff --git a/tron/core/job_scheduler.py b/tron/core/job_scheduler.py index e4e134fcb..3b8262c1c 100644 --- a/tron/core/job_scheduler.py +++ b/tron/core/job_scheduler.py @@ -250,7 +250,6 @@ def __init__(self, context, output_stream_dir, time_zone, action_runner, job_gra self.action_runner = action_runner self.job_graph = job_graph - # TODO: takes dict[] returns JobScheduler def build(self, job_config): log.debug(f"Building new job scheduler {job_config.name}") output_path = filehandler.OutputPath(self.output_stream_dir) diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 4940e4346..4244d4769 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -24,22 +24,16 @@ from tron.metrics import timer from tron.serialize import runstate -# Restore -# TODO: Get the correct number of partitions -# TODO: Merge items -# TODO: Restore items - - # Max DynamoDB object size is 400KB. Since we save two copies of the object (pickled and JSON), # we need to consider this max size applies to the entire item, so we use a max size of 200KB # for each version. # # In testing I could get away with 201_000 for both partitions so this should be enough overhead -# to contain the object name and other data. -OBJECT_SIZE = 200_000 # TODO: config this to avoid rolling out new version when we swap back to 400_000? +# to contain other attributes like object name and number of partitions. +OBJECT_SIZE = 200_000 # TODO: TRON-2240 - consider swapping back to 400_000 now that we've removed pickles MAX_SAVE_QUEUE = 500 MAX_ATTEMPTS = 10 -MAX_TRANSACT_WRITE_ITEMS = 100 # Max number of items to write in a single transaction +MAX_TRANSACT_WRITE_ITEMS = 100 log = logging.getLogger(__name__) T = TypeVar("T") @@ -163,7 +157,7 @@ def save(self, key_value_pairs) -> None: time.sleep(5) continue with self.save_lock: - if val is None: # QUESTION: is this a real option? + if val is None: self.save_queue[key] = (val, None) else: state_type = self.get_type_from_key(key) @@ -304,7 +298,7 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: delta=time.time() - start, ) - # TODO: Is this ok if we just use the max number of partitions? + # TODO: TRON-2238 - Is this ok if we just use the max number of partitions? def _delete_item(self, key: str) -> None: start = time.time() try: @@ -322,10 +316,10 @@ def _delete_item(self, key: str) -> None: delta=time.time() - start, ) - # TODO: Get max partitions between pickle and json + # TODO: TRON-2238 - Get max partitions between pickle and json def _get_num_of_partitions(self, key: str) -> int: """ - Return how many parts is the item partitioned into + Return the number of partitions an item is divided into. """ try: partition = self.table.get_item( From f26f855ea16d17962713bda1f3ff4a799134f42c Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Wed, 23 Oct 2024 08:34:45 -0700 Subject: [PATCH 17/22] Fix time_zone type. Remove unreachable code after typing trigger_downstreams --- tron/core/actionrun.py | 2 -- tron/core/job.py | 4 +--- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index b141f4b96..95758be07 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -655,8 +655,6 @@ def triggers_to_emit(self) -> List[str]: templates = ["shortdate.{shortdate}"] elif isinstance(self.trigger_downstreams, dict): templates = [f"{k}.{v}" for k, v in self.trigger_downstreams.items()] - else: - log.error(f"{self} trigger_downstreams must be true or dict") return [self.render_template(trig) for trig in templates] diff --git a/tron/core/job.py b/tron/core/job.py index 22bb64d66..93cb31226 100644 --- a/tron/core/job.py +++ b/tron/core/job.py @@ -6,8 +6,6 @@ from typing import Optional from typing import TypeVar -import pytz - from tron import command_context from tron import node from tron.actioncommand import SubprocessActionRunnerFactory @@ -93,7 +91,7 @@ def __init__( allow_overlap: Optional[bool] = None, action_runner: Optional[SubprocessActionRunnerFactory] = None, max_runtime: Optional[datetime.timedelta] = None, - time_zone: Optional[pytz.timezone] = None, + time_zone: Optional[datetime.tzinfo] = None, expected_runtime: Optional[datetime.timedelta] = None, run_limit: Optional[int] = None, ): From c3c8c77cbd908fe0a49f0301d355104f02aa4edc Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Thu, 24 Oct 2024 07:22:03 -0700 Subject: [PATCH 18/22] Maintain most of the parsing logic as is. Add a few more tests. Add TODO and ticket for regex issues --- tests/utils/crontab_test.py | 54 +++++++++++++++++++++++++++++++++++++ tron/utils/crontab.py | 6 ++--- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/tests/utils/crontab_test.py b/tests/utils/crontab_test.py index ee0281713..aa80b0a22 100644 --- a/tests/utils/crontab_test.py +++ b/tests/utils/crontab_test.py @@ -43,6 +43,30 @@ def test_parse(self, mock_dow, mock_month, mock_monthday, mock_hour, mock_min): assert_equal(actual["months"], mock_month.return_value) assert_equal(actual["weekdays"], mock_dow.return_value) + def test_full_crontab_line(self): + line = "*/15 0 1,15 * 1-5" + expected = { + "minutes": [0, 15, 30, 45], + "hours": [0], + "monthdays": [1, 15], + "months": None, + "weekdays": [1, 2, 3, 4, 5], + "ordinals": None, + } + assert_equal(crontab.parse_crontab(line), expected) + + def test_full_crontab_line_with_last(self): + line = "0 0 L * *" + expected = { + "minutes": [0], + "hours": [0], + "monthdays": ["LAST"], + "months": None, + "weekdays": None, + "ordinals": None, + } + assert_equal(crontab.parse_crontab(line), expected) + class TestMinuteFieldParser(TestCase): @setup @@ -108,5 +132,35 @@ def test_parse_last(self): assert_equal(self.parser.parse("5, 6, L"), expected) +class TestComplexExpressions(TestCase): + @setup + def setup_parser(self): + self.parser = crontab.MinuteFieldParser() + + def test_complex_expression(self): + expected = [0, 10, 20, 30, 40, 50, 55] + assert_equal(self.parser.parse("*/10,55"), expected) + + +class TestInvalidInputs(TestCase): + @setup + def setup_parser(self): + self.parser = crontab.MinuteFieldParser() + + def test_invalid_expression(self): + with assert_raises(ValueError): + self.parser.parse("61") + + +class TestBoundaryValues(TestCase): + @setup + def setup_parser(self): + self.parser = crontab.MinuteFieldParser() + + def test_boundary_values(self): + assert_equal(self.parser.parse("0"), [0]) + assert_equal(self.parser.parse("59"), [59]) + + if __name__ == "__main__": run() diff --git a/tron/utils/crontab.py b/tron/utils/crontab.py index 7ff021891..1ee5bd07e 100644 --- a/tron/utils/crontab.py +++ b/tron/utils/crontab.py @@ -28,6 +28,7 @@ def convert_predefined(line: str) -> str: return PREDEFINED_SCHEDULE[line] +# TODO: TRON-1761 - Fix cron validation. The pattern is not working as expected. class FieldParser: """Parse and validate a field in a crontab entry.""" @@ -61,10 +62,7 @@ def parse(self, source: str) -> Optional[Union[List[int], List[Union[int, str]]] sorted_groups: List[Union[int, str]] = sorted(groups, key=lambda x: (isinstance(x, str), x)) if has_last: sorted_groups.append("LAST") - if not sorted_groups: - return None - if all(isinstance(x, int) for x in sorted_groups): - return sorted_groups + return sorted_groups def get_match_groups(self, source: str) -> dict: From 7a0cb98046d4fc71ddc78adf3aa4e7e3cd037ebf Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Thu, 24 Oct 2024 09:23:31 -0700 Subject: [PATCH 19/22] Add error handling in various to_json() funcs. Break JSON write out from pickle write so that we maintain writing pickles if JSON fails --- .../runstate/dynamodb_state_store_test.py | 1 - tron/actioncommand.py | 22 +++-- tron/core/action.py | 61 ++++++++------ tron/core/actionrun.py | 82 +++++++++++-------- tron/core/job.py | 8 +- tron/core/jobrun.py | 31 ++++--- .../runstate/dynamodb_state_store.py | 32 ++++---- tron/utils/persistable.py | 3 +- 8 files changed, 142 insertions(+), 98 deletions(-) diff --git a/tests/serialize/runstate/dynamodb_state_store_test.py b/tests/serialize/runstate/dynamodb_state_store_test.py index 5a54e4cfe..a6d282455 100644 --- a/tests/serialize/runstate/dynamodb_state_store_test.py +++ b/tests/serialize/runstate/dynamodb_state_store_test.py @@ -125,7 +125,6 @@ def large_object(): } -# TODO: Add better test for to_json? @pytest.mark.usefixtures("store", "small_object", "large_object") class TestDynamoDBStateStore: def test_save(self, store, small_object, large_object): diff --git a/tron/actioncommand.py b/tron/actioncommand.py index 7a49fc1cf..cea3cfb85 100644 --- a/tron/actioncommand.py +++ b/tron/actioncommand.py @@ -3,6 +3,7 @@ import os from io import StringIO from shlex import quote +from typing import Optional from tron.config import schema from tron.serialize import filehandler @@ -203,13 +204,20 @@ def __ne__(self, other): return not self == other @staticmethod - def to_json(state_data: dict) -> str: - return json.dumps( - { - "status_path": state_data["status_path"], - "exec_path": state_data["exec_path"], - } - ) + def to_json(state_data: dict) -> Optional[str]: + try: + return json.dumps( + { + "status_path": state_data["status_path"], + "exec_path": state_data["exec_path"], + } + ) + except KeyError as e: + log.error(f"Missing key in state_data: {e}") + return None + except Exception as e: + log.error(f"Error serializing SubprocessActionRunnerFactory to JSON: {e}") + return None def create_action_runner_factory_from_config(config): diff --git a/tron/core/action.py b/tron/core/action.py index b68c13ff4..0a0c301d0 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -55,7 +55,7 @@ def copy(self): return ActionCommandConfig(**self.state_data) @staticmethod - def to_json(state_data: dict) -> str: + def to_json(state_data: dict) -> Optional[str]: """Serialize the ActionCommandConfig instance to a JSON string.""" def serialize_namedtuple(obj): @@ -63,31 +63,40 @@ def serialize_namedtuple(obj): return obj._asdict() return obj - return json.dumps( - { - "command": state_data["command"], - "cpus": state_data["cpus"], - "mem": state_data["mem"], - "disk": state_data["disk"], - "cap_add": state_data["cap_add"], - "cap_drop": state_data["cap_drop"], - "constraints": list(state_data["constraints"]), - "docker_image": state_data["docker_image"], - "docker_parameters": list(state_data["docker_parameters"]), - "env": state_data["env"], - "secret_env": state_data["secret_env"], - "secret_volumes": [serialize_namedtuple(volume) for volume in state_data["secret_volumes"]], - "projected_sa_volumes": [serialize_namedtuple(volume) for volume in state_data["projected_sa_volumes"]], - "field_selector_env": state_data["field_selector_env"], - "extra_volumes": list(state_data["extra_volumes"]), - "node_selectors": state_data["node_selectors"], - "node_affinities": [serialize_namedtuple(affinity) for affinity in state_data["node_affinities"]], - "labels": state_data["labels"], - "annotations": state_data["annotations"], - "service_account_name": state_data["service_account_name"], - "ports": state_data["ports"], - } - ) + try: + return json.dumps( + { + "command": state_data["command"], + "cpus": state_data["cpus"], + "mem": state_data["mem"], + "disk": state_data["disk"], + "cap_add": state_data["cap_add"], + "cap_drop": state_data["cap_drop"], + "constraints": list(state_data["constraints"]), + "docker_image": state_data["docker_image"], + "docker_parameters": list(state_data["docker_parameters"]), + "env": state_data["env"], + "secret_env": state_data["secret_env"], + "secret_volumes": [serialize_namedtuple(volume) for volume in state_data["secret_volumes"]], + "projected_sa_volumes": [ + serialize_namedtuple(volume) for volume in state_data["projected_sa_volumes"] + ], + "field_selector_env": state_data["field_selector_env"], + "extra_volumes": list(state_data["extra_volumes"]), + "node_selectors": state_data["node_selectors"], + "node_affinities": [serialize_namedtuple(affinity) for affinity in state_data["node_affinities"]], + "labels": state_data["labels"], + "annotations": state_data["annotations"], + "service_account_name": state_data["service_account_name"], + "ports": state_data["ports"], + } + ) + except KeyError as e: + log.error(f"Missing key in state_data: {e}") + return None + except Exception as e: + log.error(f"Error serializing ActionCommandConfig to JSON: {e}") + return None @dataclass diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 95758be07..609f4201e 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -175,19 +175,26 @@ def state_data(self): return state_data @staticmethod - def to_json(state_data: dict) -> str: + def to_json(state_data: dict) -> Optional[str]: """Serialize the ActionRunAttempt instance to a JSON string.""" - return json.dumps( - { - "command_config": ActionCommandConfig.to_json(state_data["command_config"]), - "start_time": state_data["start_time"].isoformat() if state_data["start_time"] else None, - "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, - "rendered_command": state_data["rendered_command"], - "exit_status": state_data["exit_status"], - "mesos_task_id": state_data["mesos_task_id"], - "kubernetes_task_id": state_data["kubernetes_task_id"], - } - ) + try: + return json.dumps( + { + "command_config": ActionCommandConfig.to_json(state_data["command_config"]), + "start_time": state_data["start_time"].isoformat() if state_data["start_time"] else None, + "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, + "rendered_command": state_data["rendered_command"], + "exit_status": state_data["exit_status"], + "mesos_task_id": state_data["mesos_task_id"], + "kubernetes_task_id": state_data["kubernetes_task_id"], + } + ) + except KeyError as e: + log.error(f"Missing key in state_data: {e}") + return None + except Exception as e: + log.error(f"Error serializing ActionRunAttempt to JSON: {e}") + return None @classmethod def from_state(cls, state_data): @@ -731,7 +738,7 @@ def state_data(self): } @staticmethod - def to_json(state_data: dict) -> str: + def to_json(state_data: dict) -> Optional[str]: """Serialize the ActionRun instance to a JSON string.""" action_runner = state_data.get("action_runner") if action_runner is None: @@ -739,27 +746,34 @@ def to_json(state_data: dict) -> str: else: action_runner_json = SubprocessActionRunnerFactory.to_json(action_runner) - return json.dumps( - { - "job_run_id": state_data["job_run_id"], - "action_name": state_data["action_name"], - "state": state_data["state"], - "original_command": state_data["original_command"], - "start_time": state_data["start_time"].isoformat() if state_data["start_time"] else None, - "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, - "node_name": state_data["node_name"], - "exit_status": state_data["exit_status"], - "attempts": [ActionRunAttempt.to_json(attempt) for attempt in state_data["attempts"]], - "retries_remaining": state_data["retries_remaining"], - "retries_delay": state_data["retries_delay"], - "action_runner": action_runner_json, - "executor": state_data["executor"], - "trigger_downstreams": state_data["trigger_downstreams"], - "triggered_by": state_data["triggered_by"], - "on_upstream_rerun": state_data["on_upstream_rerun"], - "trigger_timeout_timestamp": state_data["trigger_timeout_timestamp"], - } - ) + try: + return json.dumps( + { + "job_run_id": state_data["job_run_id"], + "action_name": state_data["action_name"], + "state": state_data["state"], + "original_command": state_data["original_command"], + "start_time": state_data["start_time"].isoformat() if state_data["start_time"] else None, + "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, + "node_name": state_data["node_name"], + "exit_status": state_data["exit_status"], + "attempts": [ActionRunAttempt.to_json(attempt) for attempt in state_data["attempts"]], + "retries_remaining": state_data["retries_remaining"], + "retries_delay": state_data["retries_delay"], + "action_runner": action_runner_json, + "executor": state_data["executor"], + "trigger_downstreams": state_data["trigger_downstreams"], + "triggered_by": state_data["triggered_by"], + "on_upstream_rerun": state_data["on_upstream_rerun"], + "trigger_timeout_timestamp": state_data["trigger_timeout_timestamp"], + } + ) + except KeyError as e: + log.error(f"Missing key in state_data: {e}") + return None + except Exception as e: + log.error(f"Error serializing ActionRun to JSON: {e}") + return None def render_template(self, template): """Render our configured command using the command context.""" diff --git a/tron/core/job.py b/tron/core/job.py index 93cb31226..d2a28e951 100644 --- a/tron/core/job.py +++ b/tron/core/job.py @@ -123,9 +123,13 @@ def __init__( log.info(f"{self} created") @staticmethod - def to_json(state_data: dict) -> str: + def to_json(state_data: dict) -> Optional[str]: """Serialize the Job instance to a JSON string.""" - return json.dumps(state_data) + try: + return json.dumps(state_data) + except Exception as e: + log.error(f"Error serializing Job to JSON: {e}") + return None @classmethod def from_config( diff --git a/tron/core/jobrun.py b/tron/core/jobrun.py index 4d3306d19..f3479b498 100644 --- a/tron/core/jobrun.py +++ b/tron/core/jobrun.py @@ -81,19 +81,26 @@ def __init__( self.context = command_context.build_context(self, base_context) @staticmethod - def to_json(state_data: dict) -> str: + def to_json(state_data: dict) -> Optional[str]: """Serialize the JobRun instance to a JSON string.""" - return json.dumps( - { - "job_name": state_data["job_name"], - "run_num": state_data["run_num"], - "run_time": state_data["run_time"].isoformat() if state_data["run_time"] else None, - "node_name": state_data["node_name"], - "runs": [ActionRun.to_json(run) for run in state_data["runs"]], - "cleanup_run": ActionRun.to_json(state_data["cleanup_run"]) if state_data["cleanup_run"] else None, - "manual": state_data["manual"], - } - ) + try: + return json.dumps( + { + "job_name": state_data["job_name"], + "run_num": state_data["run_num"], + "run_time": state_data["run_time"].isoformat() if state_data["run_time"] else None, + "node_name": state_data["node_name"], + "runs": [ActionRun.to_json(run) for run in state_data["runs"]], + "cleanup_run": ActionRun.to_json(state_data["cleanup_run"]) if state_data["cleanup_run"] else None, + "manual": state_data["manual"], + } + ) + except KeyError as e: + log.error(f"Missing key in state_data: {e}") + return None + except Exception as e: + log.error(f"Error serializing JobRun to JSON: {e}") + return None @property def id(self): diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 4244d4769..3d297611d 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -13,6 +13,7 @@ from typing import Dict from typing import List from typing import Literal +from typing import Optional from typing import Sequence from typing import Tuple from typing import TypeVar @@ -161,11 +162,12 @@ def save(self, key_value_pairs) -> None: self.save_queue[key] = (val, None) else: state_type = self.get_type_from_key(key) - json_val = self._serialize_item(state_type, val) - self.save_queue[key] = ( - val, - json_val, - ) + try: + json_val = self._serialize_item(state_type, val) + except Exception as e: + log.error(f"Failed to serialize JSON for key {key}: {e}") + json_val = None # Proceed without JSON if serialization fails + self.save_queue[key] = (val, json_val) break def _consume_save_queue(self): @@ -201,7 +203,7 @@ def get_type_from_key(self, key: str) -> str: return key.split()[0] # TODO: TRON-2305 - In an ideal world, we wouldn't be passing around state/state_data dicts. It would be a lot nicer to have regular objects here - def _serialize_item(self, key: Literal[runstate.JOB_STATE, runstate.JOB_RUN_STATE], state: Dict[str, Any]) -> str: # type: ignore + def _serialize_item(self, key: Literal[runstate.JOB_STATE, runstate.JOB_RUN_STATE], state: Dict[str, Any]) -> Optional[str]: # type: ignore if key == runstate.JOB_STATE: return Job.to_json(state) elif key == runstate.JOB_RUN_STATE: @@ -239,11 +241,9 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: pickled_val, json_val = value num_partitions = math.ceil(len(pickled_val) / OBJECT_SIZE) - num_json_val_partitions = math.ceil(len(json_val) / OBJECT_SIZE) + num_json_val_partitions = math.ceil(len(json_val) / OBJECT_SIZE) if json_val else 0 items = [] - # Use the maximum number of partitions (JSON can be larger - # than pickled value so this makes sure we save the entire item) max_partitions = max(num_partitions, num_json_val_partitions) for index in range(max_partitions): item = { @@ -263,17 +263,19 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: "num_partitions": { "N": str(num_partitions), }, - "json_val": { - "S": json_val[index * OBJECT_SIZE : min(index * OBJECT_SIZE + OBJECT_SIZE, len(json_val))] - }, - "num_json_val_partitions": { - "N": str(num_json_val_partitions), - }, }, "TableName": self.name, }, } + if json_val: + item["Put"]["Item"]["json_val"] = { + "S": json_val[index * OBJECT_SIZE : min(index * OBJECT_SIZE + OBJECT_SIZE, len(json_val))] + } + item["Put"]["Item"]["num_json_val_partitions"] = { + "N": str(num_json_val_partitions), + } + count = 0 items.append(item) diff --git a/tron/utils/persistable.py b/tron/utils/persistable.py index 04c7437cc..620956a2a 100644 --- a/tron/utils/persistable.py +++ b/tron/utils/persistable.py @@ -2,10 +2,11 @@ from abc import abstractmethod from typing import Any from typing import Dict +from typing import Optional class Persistable(ABC): @staticmethod @abstractmethod - def to_json(state_data: Dict[Any, Any]) -> str: + def to_json(state_data: Dict[Any, Any]) -> Optional[str]: pass From e79ae2510155b32a68c77e97105b9778ac82718e Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Fri, 25 Oct 2024 07:13:46 -0700 Subject: [PATCH 20/22] Update to_json exception logging. On DynamoDB write failure only pass the pickle back into the queue --- tron/actioncommand.py | 8 ++++---- tron/core/action.py | 8 ++++---- tron/core/actionrun.py | 16 ++++++++-------- tron/core/job.py | 4 ++-- tron/core/jobrun.py | 8 ++++---- tron/serialize/runstate/dynamodb_state_store.py | 5 +++-- 6 files changed, 25 insertions(+), 24 deletions(-) diff --git a/tron/actioncommand.py b/tron/actioncommand.py index cea3cfb85..b22772b98 100644 --- a/tron/actioncommand.py +++ b/tron/actioncommand.py @@ -212,11 +212,11 @@ def to_json(state_data: dict) -> Optional[str]: "exec_path": state_data["exec_path"], } ) - except KeyError as e: - log.error(f"Missing key in state_data: {e}") + except KeyError: + log.exception("Missing key in state_data:") return None - except Exception as e: - log.error(f"Error serializing SubprocessActionRunnerFactory to JSON: {e}") + except Exception: + log.exception("Error serializing SubprocessActionRunnerFactory to JSON:") return None diff --git a/tron/core/action.py b/tron/core/action.py index 0a0c301d0..0f677356a 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -91,11 +91,11 @@ def serialize_namedtuple(obj): "ports": state_data["ports"], } ) - except KeyError as e: - log.error(f"Missing key in state_data: {e}") + except KeyError: + log.exception("Missing key in state_data:") return None - except Exception as e: - log.error(f"Error serializing ActionCommandConfig to JSON: {e}") + except Exception: + log.exception("Error serializing ActionCommandConfig to JSON:") return None diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 609f4201e..452c756e6 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -189,11 +189,11 @@ def to_json(state_data: dict) -> Optional[str]: "kubernetes_task_id": state_data["kubernetes_task_id"], } ) - except KeyError as e: - log.error(f"Missing key in state_data: {e}") + except KeyError: + log.exception("Missing key in state_data:") return None - except Exception as e: - log.error(f"Error serializing ActionRunAttempt to JSON: {e}") + except Exception: + log.exception("Error serializing ActionRunAttempt to JSON:") return None @classmethod @@ -768,11 +768,11 @@ def to_json(state_data: dict) -> Optional[str]: "trigger_timeout_timestamp": state_data["trigger_timeout_timestamp"], } ) - except KeyError as e: - log.error(f"Missing key in state_data: {e}") + except KeyError: + log.exception("Missing key in state_data:") return None - except Exception as e: - log.error(f"Error serializing ActionRun to JSON: {e}") + except Exception: + log.exception("Error serializing ActionRun to JSON:") return None def render_template(self, template): diff --git a/tron/core/job.py b/tron/core/job.py index d2a28e951..3af9786a6 100644 --- a/tron/core/job.py +++ b/tron/core/job.py @@ -127,8 +127,8 @@ def to_json(state_data: dict) -> Optional[str]: """Serialize the Job instance to a JSON string.""" try: return json.dumps(state_data) - except Exception as e: - log.error(f"Error serializing Job to JSON: {e}") + except Exception: + log.exception("Error serializing Job to JSON:") return None @classmethod diff --git a/tron/core/jobrun.py b/tron/core/jobrun.py index f3479b498..e166094b2 100644 --- a/tron/core/jobrun.py +++ b/tron/core/jobrun.py @@ -95,11 +95,11 @@ def to_json(state_data: dict) -> Optional[str]: "manual": state_data["manual"], } ) - except KeyError as e: - log.error(f"Missing key in state_data: {e}") + except KeyError: + log.exception("Missing key in state_data:") return None - except Exception as e: - log.error(f"Error serializing JobRun to JSON: {e}") + except Exception: + log.exception("Error serializing JobRun to JSON:") return None @property diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 3d297611d..06b5bbabd 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -188,9 +188,10 @@ def _consume_save_queue(self): except Exception as e: error = "tron_dynamodb_save_failure: failed to save key " f'"{key}" to dynamodb:\n{repr(e)}' log.error(error) - # Add items back to the queue if we failed to save + # Add items back to the queue if we failed to save. While we roll out and test TRON-2237 we will only re-add the Pickle. + # TODO: TRON-2239 - Pass JSON back to the save queue with self.save_lock: - self.save_queue[key] = (val, json_val) + self.save_queue[key] = (val, None) duration = time.time() - start log.info(f"saved {saved} items in {duration}s") From e8dcc8e67f0bd6829efd8a25ff8e5647b1488023 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Fri, 25 Oct 2024 08:37:02 -0700 Subject: [PATCH 21/22] Raise these so that we get the full picture on failure and can decide what to do in dynamodb_state_store --- tron/actioncommand.py | 4 ++-- tron/core/action.py | 4 ++-- tron/core/actionrun.py | 8 ++++---- tron/core/job.py | 2 +- tron/core/jobrun.py | 4 ++-- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tron/actioncommand.py b/tron/actioncommand.py index b22772b98..db6e5aba4 100644 --- a/tron/actioncommand.py +++ b/tron/actioncommand.py @@ -214,10 +214,10 @@ def to_json(state_data: dict) -> Optional[str]: ) except KeyError: log.exception("Missing key in state_data:") - return None + raise except Exception: log.exception("Error serializing SubprocessActionRunnerFactory to JSON:") - return None + raise def create_action_runner_factory_from_config(config): diff --git a/tron/core/action.py b/tron/core/action.py index 0f677356a..a0ba04d82 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -93,10 +93,10 @@ def serialize_namedtuple(obj): ) except KeyError: log.exception("Missing key in state_data:") - return None + raise except Exception: log.exception("Error serializing ActionCommandConfig to JSON:") - return None + raise @dataclass diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index 452c756e6..9249abcb2 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -191,10 +191,10 @@ def to_json(state_data: dict) -> Optional[str]: ) except KeyError: log.exception("Missing key in state_data:") - return None + raise except Exception: log.exception("Error serializing ActionRunAttempt to JSON:") - return None + raise @classmethod def from_state(cls, state_data): @@ -770,10 +770,10 @@ def to_json(state_data: dict) -> Optional[str]: ) except KeyError: log.exception("Missing key in state_data:") - return None + raise except Exception: log.exception("Error serializing ActionRun to JSON:") - return None + raise def render_template(self, template): """Render our configured command using the command context.""" diff --git a/tron/core/job.py b/tron/core/job.py index 3af9786a6..df8171b61 100644 --- a/tron/core/job.py +++ b/tron/core/job.py @@ -129,7 +129,7 @@ def to_json(state_data: dict) -> Optional[str]: return json.dumps(state_data) except Exception: log.exception("Error serializing Job to JSON:") - return None + raise @classmethod def from_config( diff --git a/tron/core/jobrun.py b/tron/core/jobrun.py index e166094b2..061ef53c4 100644 --- a/tron/core/jobrun.py +++ b/tron/core/jobrun.py @@ -97,10 +97,10 @@ def to_json(state_data: dict) -> Optional[str]: ) except KeyError: log.exception("Missing key in state_data:") - return None + raise except Exception: log.exception("Error serializing JobRun to JSON:") - return None + raise @property def id(self): From b0186d1119940dda5887406733ad307204c06afc Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Mon, 28 Oct 2024 13:54:34 -0700 Subject: [PATCH 22/22] Add serialization error counter --- tron/prom_metrics.py | 6 +++++ .../runstate/dynamodb_state_store.py | 24 ++++++++++--------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/tron/prom_metrics.py b/tron/prom_metrics.py index 4a11bb8fd..d6eef803c 100644 --- a/tron/prom_metrics.py +++ b/tron/prom_metrics.py @@ -1,6 +1,12 @@ +from prometheus_client import Counter from prometheus_client import Gauge tron_cpu_gauge = Gauge("tron_k8s_cpus", "Total number of CPUs allocated to Tron-launched containers") tron_memory_gauge = Gauge("tron_k8s_mem", "Total amount of memory allocated to Tron-launched containers (in megabytes)") tron_disk_gauge = Gauge("tron_k8s_disk", "Total amount of disk allocated to Tron-launched containers (in megabytes)") + +json_serialization_errors_counter = Counter( + "json_serialization_errors_total", + "Total number of errors encountered while serializing state_data as JSON. These errors occur before writing to DynamoDB.", +) diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 06b5bbabd..34b3a6976 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -20,6 +20,7 @@ import boto3 # type: ignore +import tron.prom_metrics as prom_metrics from tron.core.job import Job from tron.core.jobrun import JobRun from tron.metrics import timer @@ -162,11 +163,7 @@ def save(self, key_value_pairs) -> None: self.save_queue[key] = (val, None) else: state_type = self.get_type_from_key(key) - try: - json_val = self._serialize_item(state_type, val) - except Exception as e: - log.error(f"Failed to serialize JSON for key {key}: {e}") - json_val = None # Proceed without JSON if serialization fails + json_val = self._serialize_item(state_type, val) self.save_queue[key] = (val, json_val) break @@ -205,12 +202,17 @@ def get_type_from_key(self, key: str) -> str: # TODO: TRON-2305 - In an ideal world, we wouldn't be passing around state/state_data dicts. It would be a lot nicer to have regular objects here def _serialize_item(self, key: Literal[runstate.JOB_STATE, runstate.JOB_RUN_STATE], state: Dict[str, Any]) -> Optional[str]: # type: ignore - if key == runstate.JOB_STATE: - return Job.to_json(state) - elif key == runstate.JOB_RUN_STATE: - return JobRun.to_json(state) - else: - raise ValueError(f"Unknown type: key {key}") + try: + if key == runstate.JOB_STATE: + return Job.to_json(state) + elif key == runstate.JOB_RUN_STATE: + return JobRun.to_json(state) + else: + raise ValueError(f"Unknown type: key {key}") + except Exception: + log.exception(f"Serialization error for key {key}") + prom_metrics.json_serialization_errors_counter.inc() + return None def _save_loop(self): while True: