Skip to content

Commit

Permalink
Add uses bulkdata argument to paasta spark run
Browse files Browse the repository at this point in the history
This makes the change to paasta spark run so that
https://github.yelpcorp.com/sysgit/yelpsoa-configs/pull/52010 will work
as expected

I'm not checking here if the /nail/bulkdata volume is specified in the
spark config, e.g
`spark.kubernetes.executor.volumes.hostPath.0.mount.path=/nail/bulkdata`
- doing this and setting uses_bulkdata set to True would result in
  multiple docker volumes being set which would cause a failure.

This follows on from [this conversation in
slack](https://yelp.slack.com/archives/CA8BWU65D/p1729768030212919) and
will allow us to complete [this
project](https://yelpwiki.yelpcorp.com/display/PRODENG/Project+Incredible+Bulk)
  • Loading branch information
timmow committed Dec 16, 2024
1 parent c7348a9 commit 0b43710
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
10 changes: 10 additions & 0 deletions paasta_tools/cli/cmds/spark_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,13 @@ def add_subparser(subparsers):
default=False,
)

list_parser.add_argument(
"--uses-bulkdata",
help="Mount /nail/bulkdata in the container",
action="store_true",
default=False,
)

aws_group = list_parser.add_argument_group(
title="AWS credentials options",
description="If --aws-credentials-yaml is specified, it overrides all "
Expand Down Expand Up @@ -785,6 +792,9 @@ def configure_and_run_docker_container(
else:
raise UnsupportedClusterManagerException(cluster_manager)

if args.uses_bulkdata:
volumes.append("/nail/bulkdata:/nail/bulkdata:ro")

volumes.append("%s:rw" % args.work_dir)
volumes.append("/nail/home:/nail/home:rw")

Expand Down
10 changes: 10 additions & 0 deletions tests/cli/test_cmds_spark_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ class TestConfigureAndRunDockerContainer:
"fake_dir",
)

@pytest.mark.parametrize("uses_bulkdata", [True, False])
@pytest.mark.parametrize(
["cluster_manager", "spark_args_volumes", "expected_volumes"],
[
Expand Down Expand Up @@ -468,6 +469,7 @@ def test_configure_and_run_docker_container(
cluster_manager,
spark_args_volumes,
expected_volumes,
uses_bulkdata,
):
mock_get_username.return_value = "fake_user"
spark_conf = {
Expand All @@ -494,6 +496,7 @@ def test_configure_and_run_docker_container(
args.tronfig = None
args.job_id = None
args.use_service_auth_token = False
args.uses_bulkdata = uses_bulkdata
with mock.patch.object(
self.instance_config, "get_env_dictionary", return_value={"env1": "val1"}
):
Expand All @@ -512,10 +515,15 @@ def test_configure_and_run_docker_container(
spark_config_dict=spark_conf,
is_mrjob=args.mrjob,
)
if uses_bulkdata:
bullkdata_volumes = ["/nail/bulkdata:/nail/bulkdata:ro"]
else:
bullkdata_volumes = []
mock_run_docker_container.assert_called_once_with(
container_name="fake_app",
volumes=(
expected_volumes
+ bullkdata_volumes
+ [
"/fake_dir:/spark_driver:rw",
"/nail/home:/nail/home:rw",
Expand Down Expand Up @@ -609,6 +617,7 @@ def test_configure_and_run_docker_driver_resource_limits_config(
args.docker_memory_limit = "4g"
args.docker_shm_size = "1g"
args.use_service_auth_token = False
args.uses_bulkdata = False
with mock.patch.object(
self.instance_config, "get_env_dictionary", return_value={"env1": "val1"}
):
Expand Down Expand Up @@ -724,6 +733,7 @@ def test_configure_and_run_docker_driver_resource_limits(
args.docker_memory_limit = False
args.docker_shm_size = False
args.use_service_auth_token = False
args.uses_bulkdata = False
with mock.patch.object(
self.instance_config, "get_env_dictionary", return_value={"env1": "val1"}
):
Expand Down

0 comments on commit 0b43710

Please sign in to comment.