Skip to content

Commit

Permalink
Merge pull request #1001 from Yelp/u/cuza/getting_namespace_logs_from…
Browse files Browse the repository at this point in the history
…_yelpsoa

Fixing Tron logs for jobs using other services images
  • Loading branch information
KaspariK authored Oct 29, 2024
2 parents 01fc582 + 4ad7ca7 commit bd8624f
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 1 deletion.
77 changes: 77 additions & 0 deletions tests/utils/scribereader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from unittest import mock

import pytest
import yaml

import tron.utils.scribereader
from tron.utils.scribereader import decompose_action_id
from tron.utils.scribereader import read_log_stream_for_action_run

try:
Expand Down Expand Up @@ -420,3 +422,78 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_for_long_output():
# The expected output should be max_lines plus the
# extra line for 'This output is truncated.' message
assert len(output) == max_lines + 1


def test_decompose_action_id_yml_file_found():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
config_content = """
job:
actions:
action:
service: test_service
"""
with mock.patch("builtins.open", mock.mock_open(read_data=config_content)), mock.patch(
"yaml.safe_load", return_value=yaml.safe_load(config_content)
):
namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster)
assert namespace == "test_service"
assert job_name == "job"
assert run_num == "1234"
assert action == "action"


def test_decompose_action_id_file_not_found():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
with mock.patch("builtins.open", side_effect=FileNotFoundError):
namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster)
assert namespace == "namespace"
assert job_name == "job"
assert run_num == "1234"
assert action == "action"


def test_decompose_action_id_yaml_error():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
with mock.patch("builtins.open", mock.mock_open(read_data="invalid_yaml")), mock.patch(
"yaml.safe_load", side_effect=yaml.YAMLError
):
namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster)
assert namespace == "namespace"
assert job_name == "job"
assert run_num == "1234"
assert action == "action"


def test_decompose_action_id_generic_error():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
with mock.patch("builtins.open", mock.mock_open(read_data="some_data")), mock.patch(
"yaml.safe_load", side_effect=Exception
):
namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster)
assert namespace == "namespace"
assert job_name == "job"
assert run_num == "1234"
assert action == "action"


def test_decompose_action_id_service_not_found():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
config_content = """
job:
actions:
action:
command: "sleep 10"
"""
with mock.patch("builtins.open", mock.mock_open(read_data=config_content)), mock.patch(
"yaml.safe_load", return_value=yaml.safe_load(config_content)
):
namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster)
assert namespace == "namespace"
assert job_name == "job"
assert run_num == "1234"
assert action == "action"
28 changes: 27 additions & 1 deletion tron/utils/scribereader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Tuple

import staticconf # type: ignore
import yaml

from tron.config.static_config import get_config_watcher
from tron.config.static_config import NAMESPACE
Expand Down Expand Up @@ -81,12 +82,37 @@ def get_scribereader_host_and_port(ecosystem: str, superregion: str, region: str
return host, port


def decompose_action_id(action_run_id: str, paasta_cluster: str) -> Tuple[str, str, str, str]:
namespace, job_name, run_num, action = action_run_id.split(".")
for ext in ["yaml", "yml"]:
try:
with open(f"/nail/etc/services/{namespace}/tron-{paasta_cluster}.{ext}") as f:
config = yaml.load(f, Loader=yaml.CSafeLoader)
service: Optional[str] = (
config.get(job_name, {}).get("actions", {}).get(action, {}).get("service", None)
)
if service:
return service, job_name, run_num, action
except FileNotFoundError:
log.warning(f"yelp-soaconfig file tron-{paasta_cluster}.{ext} not found for action_run_id {action_run_id}.")
except yaml.YAMLError:
log.exception(
f"Error parsing YAML file tron-{paasta_cluster}.yaml for {action_run_id} - will default to using current namespace:"
)
except Exception:
log.exception(
f"Error reading service for {action_run_id} from file tron-{paasta_cluster}.yaml - will default to using current namespace:"
)

return namespace, job_name, run_num, action


class PaaSTALogs:
def __init__(self, component: str, paasta_cluster: str, action_run_id: str) -> None:
self.component = component
self.paasta_cluster = paasta_cluster
self.action_run_id = action_run_id
namespace, job_name, run_num, action = action_run_id.split(".")
namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster)
# in our logging infra, things are logged to per-instance streams - but
# since Tron PaaSTA instances are of the form `job_name.action`, we need
# to escape the period since some parts of our infra will reject streams
Expand Down

0 comments on commit bd8624f

Please sign in to comment.