Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing Tron logs for jobs using other services images #1001

Merged
merged 7 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions tests/utils/scribereader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from unittest import mock

import pytest
import yaml

import tron.utils.scribereader
from tron.utils.scribereader import decompose_action_id
from tron.utils.scribereader import read_log_stream_for_action_run

try:
Expand Down Expand Up @@ -420,3 +422,78 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_for_long_output():
# The expected output should be max_lines plus the
# extra line for 'This output is truncated.' message
assert len(output) == max_lines + 1


def test_decompose_action_id_yml_file_found():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
config_content = """
job:
actions:
action:
service: test_service
"""
with mock.patch("builtins.open", mock.mock_open(read_data=config_content)), mock.patch(
"yaml.safe_load", return_value=yaml.safe_load(config_content)
):
namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster)
assert namespace == "test_service"
assert job_name == "job"
assert run_num == "1234"
assert action == "action"


def test_decompose_action_id_file_not_found():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
with mock.patch("builtins.open", side_effect=FileNotFoundError):
namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster)
assert namespace == "namespace"
assert job_name == "job"
assert run_num == "1234"
assert action == "action"


def test_decompose_action_id_yaml_error():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
with mock.patch("builtins.open", mock.mock_open(read_data="invalid_yaml")), mock.patch(
"yaml.safe_load", side_effect=yaml.YAMLError
):
namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster)
assert namespace == "namespace"
assert job_name == "job"
assert run_num == "1234"
assert action == "action"


def test_decompose_action_id_generic_error():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
with mock.patch("builtins.open", mock.mock_open(read_data="some_data")), mock.patch(
"yaml.safe_load", side_effect=Exception
):
namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster)
assert namespace == "namespace"
assert job_name == "job"
assert run_num == "1234"
assert action == "action"


def test_decompose_action_id_service_not_found():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
config_content = """
job:
actions:
action:
command: "sleep 10"
"""
with mock.patch("builtins.open", mock.mock_open(read_data=config_content)), mock.patch(
"yaml.safe_load", return_value=yaml.safe_load(config_content)
):
namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster)
assert namespace == "namespace"
assert job_name == "job"
assert run_num == "1234"
assert action == "action"
26 changes: 25 additions & 1 deletion tron/utils/scribereader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Tuple

import staticconf # type: ignore
import yaml

from tron.config.static_config import get_config_watcher
from tron.config.static_config import NAMESPACE
Expand Down Expand Up @@ -81,12 +82,35 @@ def get_scribereader_host_and_port(ecosystem: str, superregion: str, region: str
return host, port


def decompose_action_id(action_run_id: str, paasta_cluster: str) -> Tuple[str, str, str, str]:
namespace, job_name, run_num, action = action_run_id.split(".")
for ext in ["yaml", "yml"]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo, we can hardcode looking at just .yaml - the only .yml file in soaconfigs is a symlink

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure on this one and wanted to be extra safe. I'm checking yaml before trying yml. I only added it as a fallback cuz iirc I don't think we have nothing in place preventing folks to commit a my_cluster-tron.yml file

try:
with open(f"/nail/etc/services/{namespace}/tron-{paasta_cluster}.{ext}") as f:
config = yaml.load(f, Loader=yaml.CSafeLoader)
Comment on lines +89 to +90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am a tad worried about running this every time someone wants to load logs for something, but i'm not sure if we can meaningfully cache things here since soaconfigs can change at any time

that is to say: we'll probably wanna keep an eye on things and make sure this doesn't slow things down too much (and if it does, we'll probably want to figure out how to thread the service config through tron so that we can skip this IO)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was very paranoid about the YAML loading as well, yaml.CSafeLoader can load the longest tron soaconfig file in about 40ms. That's probably fast enough

service: Optional[str] = (
config.get(job_name, {}).get("actions", {}).get(action, {}).get("service", None)
)
if service:
return service, job_name, run_num, action
except FileNotFoundError:
log.warning(f"yelp-soaconfig file tron-{paasta_cluster}.{ext} not found for action_run_id {action_run_id}.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hhhhmmm failing to find yelpsoa should probably be more severe than warning

except yaml.YAMLError as e:
log.error(f"Error parsing YAML file tron-{paasta_cluster}.{ext} for action_run_id {action_run_id}: {e}")
except Exception as e:
log.error(
f"Error reading service for action_run_id {action_run_id} from file tron-{paasta_cluster}.{ext}: {e}"
)
cuza marked this conversation as resolved.
Show resolved Hide resolved

return namespace, job_name, run_num, action


class PaaSTALogs:
def __init__(self, component: str, paasta_cluster: str, action_run_id: str) -> None:
self.component = component
self.paasta_cluster = paasta_cluster
self.action_run_id = action_run_id
namespace, job_name, run_num, action = action_run_id.split(".")
namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster)
# in our logging infra, things are logged to per-instance streams - but
# since Tron PaaSTA instances are of the form `job_name.action`, we need
# to escape the period since some parts of our infra will reject streams
Expand Down
Loading