diff --git a/redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py b/redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py index 0b2e857..c88bd1c 100644 --- a/redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py +++ b/redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py @@ -101,7 +101,6 @@ def main(): get_version_string(project_name, project_version) ) args = parser.parse_args() - if args.logname is not None: print("Writting log to {}".format(args.logname)) logging.basicConfig( @@ -119,6 +118,37 @@ def main(): datefmt=LOG_DATEFMT, ) logging.info(get_version_string(project_name, project_version)) + + kwargs = process_args(args) + + logging.info("Connecting to event stream server") + conn = connect_redis_server( + args.event_stream_host, + args.event_stream_port, + args.event_stream_user, + args.event_stream_pass, + ) + kwargs["conn"] = conn + + if kwargs["datasink_push_results_redistimeseries"] is True: + logging.info("Connecting to datasync server") + datasync_conn = connect_redis_server( + args.datasink_redistimeseries_host, + args.datasink_redistimeseries_port, + args.datasink_redistimeseries_user, + args.datasink_redistimeseries_pass, + ) + kwargs["datasync_conn"] = datasync_conn + + build_runners_consumer_group_create(conn, args.platform_name) + + logging.info("Entering blocking read waiting for work.") + while True: + _, stream_id, _, _ = self_contained_coordinator_blocking_read(kwargs) + + +def process_args(args): + kwargs = {} topologies_folder = os.path.abspath(args.setups_folder + "/topologies") logging.info("Using topologies folder dir {}".format(topologies_folder)) topologies_files = pathlib.Path(topologies_folder).glob("*.yml") @@ -128,240 +158,166 @@ def main(): " ".join([str(x) for x in topologies_files]) ) ) - topologies_map = get_topologies(topologies_files[0]) - testsuite_spec_files = extract_testsuites(args) + kwargs["topologies_map"] = get_topologies(topologies_files[0]) + kwargs["testsuite_spec_files"] = extract_testsuites(args) + kwargs["home"] = str(Path.home()) + kwargs["cpuset_start_pos"] = args.cpuset_start_pos + logging.info("Start CPU pinning at position {}".format(kwargs["cpuset_start_pos"])) + kwargs["redis_proc_start_port"] = args.redis_proc_start_port logging.info( - "Reading event streams from: {}:{} with user {}".format( - args.event_stream_host, args.event_stream_port, args.event_stream_user - ) + "Redis Processes start port: {}".format(kwargs["redis_proc_start_port"]) ) - try: - conn = redis.StrictRedis( - host=args.event_stream_host, - port=args.event_stream_port, - decode_responses=False, # dont decode due to binary archives - password=args.event_stream_pass, - username=args.event_stream_user, - health_check_interval=REDIS_HEALTH_CHECK_INTERVAL, - socket_connect_timeout=REDIS_SOCKET_TIMEOUT, - socket_keepalive=True, - ) - conn.ping() - except redis.exceptions.ConnectionError as e: - logging.error( - "Unable to connect to redis available at: {}:{} to read the event streams".format( - args.event_stream_host, args.event_stream_port - ) - ) - logging.error("Error message {}".format(e.__str__())) - exit(1) - datasink_conn = None - if args.datasink_push_results_redistimeseries: - logging.info( - "Checking redistimeseries datasink connection is available at: {}:{} to push the timeseries data".format( - args.datasink_redistimeseries_host, args.datasink_redistimeseries_port - ) - ) - try: - datasink_conn = redis.StrictRedis( - host=args.datasink_redistimeseries_host, - port=args.datasink_redistimeseries_port, - decode_responses=True, - password=args.datasink_redistimeseries_pass, - username=args.datasink_redistimeseries_user, - health_check_interval=REDIS_HEALTH_CHECK_INTERVAL, - socket_connect_timeout=REDIS_SOCKET_TIMEOUT, - socket_keepalive=True, - ) - datasink_conn.ping() - except redis.exceptions.ConnectionError as e: - logging.error( - "Unable to connect to redis available at: {}:{}".format( - args.datasink_redistimeseries_host, - args.datasink_redistimeseries_port, - ) - ) - logging.error("Error message {}".format(e.__str__())) - exit(1) logging.info("checking build spec requirements") - running_platform = args.platform_name - build_runners_consumer_group_create(conn, running_platform) - stream_id = None - docker_client = docker.from_env() - home = str(Path.home()) - cpuset_start_pos = args.cpuset_start_pos - logging.info("Start CPU pinning at position {}".format(cpuset_start_pos)) - redis_proc_start_port = args.redis_proc_start_port - logging.info("Redis Processes start port: {}".format(redis_proc_start_port)) - - # TODO: confirm we do have enough cores to run the spec - # availabe_cpus = args.cpu_count - datasink_push_results_redistimeseries = args.datasink_push_results_redistimeseries - grafana_profile_dashboard = args.grafana_profile_dashboard - - defaults_filename = args.defaults_filename + kwargs["running_platform"] = args.platform_name + + kwargs["stream_id"] = args.consumer_start_id + kwargs["docker_client"] = docker.from_env() + + kwargs[ + "datasink_push_results_redistimeseries" + ] = args.datasink_push_results_redistimeseries + kwargs["grafana_profile_dashboard"] = args.grafana_profile_dashboard + + kwargs["defaults_filename"] = args.defaults_filename ( _, default_metrics, _, _, _, - ) = get_defaults(defaults_filename) + ) = get_defaults(kwargs["defaults_filename"]) + kwargs["default_metrics"] = default_metrics - # Consumer id - consumer_pos = args.consumer_pos - logging.info("Consumer pos {}".format(consumer_pos)) + kwargs["consumer_pos"] = args.consumer_pos + logging.info("Consumer pos {}".format(kwargs["consumer_pos"])) - # Arch - arch = args.arch - logging.info("Running for arch: {}".format(arch)) + kwargs["arch"] = args.arch + logging.info("Running for arch: {}".format(kwargs["arch"])) # Docker air gap usage - docker_air_gap = args.docker_air_gap - if docker_air_gap: + kwargs["docker_air_gap"] = args.docker_air_gap + if kwargs["docker_air_gap"]: logging.info( "Using docker in an air-gapped way. Restoring running images from redis keys." ) - - profilers_list = [] - profilers_enabled = args.enable_profilers - if profilers_enabled: - profilers_list = args.profilers.split(",") + kwargs["override_memtier_test_time"] = args.override_memtier_test_time + if kwargs["override_memtier_test_time"] > 0: + logging.info( + "Overriding memtier benchmark --test-time to {} seconds".format( + kwargs["override_memtier_test_time"] + ) + ) + kwargs["profilers_list"] = [] + kwargs["profilers_enabled"] = args.enable_profilers + if kwargs["profilers_enabled"]: + kwargs["profilers_list"] = args.profilers.split(",") res = check_compatible_system_and_kernel_and_prepare_profile(args) if res is False: logging.error( "Requested for the following profilers to be enabled but something went wrong: {}.".format( - " ".join(profilers_list) + " ".join(kwargs["profilers_list"]) ) ) exit(1) + kwargs["consumer_name"] = "{}-self-contained-proc#{}".format( + get_runners_consumer_group_name(kwargs["running_platform"]), + kwargs["consumer_pos"], + ) + logging.info( + "Consuming from group {}. Consumer id {}".format( + get_runners_consumer_group_name(kwargs["running_platform"]), + kwargs["consumer_name"], + ) + ) - override_memtier_test_time = args.override_memtier_test_time - if override_memtier_test_time > 0: - logging.info( - "Overriding memtier benchmark --test-time to {} seconds".format( - override_memtier_test_time - ) + return kwargs + + +def connect_redis_server( + event_stream_host, event_stream_port, event_stream_user, event_stream_pass +): + logging.info( + "Connecting to Redis server: {}:{} with user {}".format( + event_stream_host, event_stream_port, event_stream_user ) - logging.info("Entering blocking read waiting for work.") - if stream_id is None: - stream_id = args.consumer_start_id - while True: - _, stream_id, _, _ = self_contained_coordinator_blocking_read( - conn, - datasink_push_results_redistimeseries, - docker_client, - home, - stream_id, - datasink_conn, - testsuite_spec_files, - topologies_map, - running_platform, - profilers_enabled, - profilers_list, - grafana_profile_dashboard, - cpuset_start_pos, - redis_proc_start_port, - consumer_pos, - docker_air_gap, - override_memtier_test_time, - default_metrics, - arch, + ) + try: + conn = redis.StrictRedis( + host=event_stream_host, + port=event_stream_port, + decode_responses=False, # dont decode due to binary archives + password=event_stream_pass, + username=event_stream_user, + health_check_interval=REDIS_HEALTH_CHECK_INTERVAL, + socket_connect_timeout=REDIS_SOCKET_TIMEOUT, + socket_keepalive=True, + ) + conn.ping() + except redis.exceptions.ConnectionError as e: + logging.error( + "Unable to connect to Redis server available at: {}:{}".format( + event_stream_host, event_stream_port + ) ) + logging.error("Error message {}".format(e.__str__())) + exit(1) + return conn -def self_contained_coordinator_blocking_read( - conn, - datasink_push_results_redistimeseries, - docker_client, - home, - stream_id, - datasink_conn, - testsuite_spec_files, - topologies_map, - platform_name, - profilers_enabled, - profilers_list, - grafana_profile_dashboard="", - cpuset_start_pos=0, - redis_proc_start_port=6379, - consumer_pos=1, - docker_air_gap=False, - override_test_time=None, - default_metrics=None, - arch="amd64", -): +def self_contained_coordinator_blocking_read(kwargs): num_process_streams = 0 num_process_test_suites = 0 overall_result = False - consumer_name = "{}-self-contained-proc#{}".format( - get_runners_consumer_group_name(platform_name), consumer_pos - ) - logging.info( - "Consuming from group {}. Consumer id {}".format( - get_runners_consumer_group_name(platform_name), consumer_name - ) - ) - newTestInfo = conn.xreadgroup( - get_runners_consumer_group_name(platform_name), - consumer_name, - {STREAM_KEYNAME_NEW_BUILD_EVENTS: stream_id}, + + newTestInfo = kwargs["conn"].xreadgroup( + get_runners_consumer_group_name(kwargs["running_platform"]), + kwargs["consumer_name"], + {STREAM_KEYNAME_NEW_BUILD_EVENTS: kwargs["stream_id"]}, count=1, block=0, ) + kwargs["newTestInfo"] = newTestInfo + if len(newTestInfo[0]) < 2 or len(newTestInfo[0][1]) < 1: - stream_id = ">" + kwargs["stream_id"] = ">" else: ( - stream_id, + kwargs["stream_id"], overall_result, total_test_suite_runs, - ) = process_self_contained_coordinator_stream( - conn, - datasink_push_results_redistimeseries, - docker_client, - home, - newTestInfo, - datasink_conn, - testsuite_spec_files, - topologies_map, - platform_name, - profilers_enabled, - profilers_list, - grafana_profile_dashboard, - cpuset_start_pos, - redis_proc_start_port, - docker_air_gap, - "defaults.yml", - None, - default_metrics, - arch, - ) + ) = process_self_contained_coordinator_stream(kwargs) + num_process_streams = num_process_streams + 1 num_process_test_suites = num_process_test_suites + total_test_suite_runs if overall_result is True: - ack_reply = conn.xack( + ack_reply = kwargs["conn"].xack( STREAM_KEYNAME_NEW_BUILD_EVENTS, - get_runners_consumer_group_name(platform_name), - stream_id, + get_runners_consumer_group_name(kwargs["running_platform"]), + kwargs["stream_id"], ) if type(ack_reply) == bytes: ack_reply = ack_reply.decode() if ack_reply == "1" or ack_reply == 1: logging.info( "Sucessfully acknowledge build variation stream with id {}.".format( - stream_id + kwargs["stream_id"] ) ) else: logging.error( "Unable to acknowledge build variation stream with id {}. XACK reply {}".format( - stream_id, ack_reply + kwargs["stream_id"], ack_reply ) ) - return overall_result, stream_id, num_process_streams, num_process_test_suites + + return ( + overall_result, + kwargs["stream_id"], + num_process_streams, + num_process_test_suites, + ) def prepare_memtier_benchmark_parameters( @@ -390,31 +346,12 @@ def prepare_memtier_benchmark_parameters( return None, benchmark_command_str -def process_self_contained_coordinator_stream( - conn, - datasink_push_results_redistimeseries, - docker_client, - home, - newTestInfo, - datasink_conn, - testsuite_spec_files, - topologies_map, - running_platform, - profilers_enabled=False, - profilers_list=[], - grafana_profile_dashboard="", - cpuset_start_pos=0, - redis_proc_start_port=6379, - docker_air_gap=False, - defaults_filename="defaults.yml", - override_test_time=None, - default_metrics=[], - arch="amd64", -): +def process_self_contained_coordinator_stream(kwargs): stream_id = "n/a" overall_result = False total_test_suite_runs = 0 try: + newTestInfo = kwargs["newTestInfo"] stream_id, testDetails = newTestInfo[0][1][0] stream_id = stream_id.decode() logging.info("Received work . Stream id {}.".format(stream_id)) @@ -433,22 +370,24 @@ def process_self_contained_coordinator_stream( run_arch, ) = extract_build_info_from_streamdata(testDetails) - if run_arch == arch: + if run_arch == kwargs["arch"]: overall_result = True profiler_dashboard_links = [] - if docker_air_gap: + if kwargs["docker_air_gap"]: airgap_key = "docker:air-gap:{}".format(run_image) logging.info( "Restoring docker image: {} from {}".format( run_image, airgap_key ) ) - airgap_docker_image_bin = conn.get(airgap_key) - images_loaded = docker_client.images.load(airgap_docker_image_bin) + airgap_docker_image_bin = kwargs["conn"].get(airgap_key) + images_loaded = kwargs["docker_client"].images.load( + airgap_docker_image_bin + ) logging.info("Successfully loaded images {}".format(images_loaded)) - for test_file in testsuite_spec_files: - if defaults_filename in test_file: + for test_file in kwargs["testsuite_spec_files"]: + if kwargs["defaults_filename"] in test_file: continue redis_containers = [] client_containers = [] @@ -497,12 +436,14 @@ def process_self_contained_coordinator_stream( test_result = False redis_container = None try: - current_cpu_pos = cpuset_start_pos + current_cpu_pos = kwargs["cpuset_start_pos"] ceil_db_cpu_limit = extract_db_cpu_limit( - topologies_map, topology_spec_name + kwargs["topologies_map"], topology_spec_name + ) + temporary_dir = tempfile.mkdtemp(dir=kwargs["home"]) + temporary_dir_client = tempfile.mkdtemp( + dir=kwargs["home"] ) - temporary_dir = tempfile.mkdtemp(dir=home) - temporary_dir_client = tempfile.mkdtemp(dir=home) logging.info( "Using local temporary dir to persist redis build artifacts. Path: {}".format( temporary_dir @@ -519,13 +460,13 @@ def process_self_contained_coordinator_stream( setup_type = "oss-standalone" tf_triggering_env = "ci" github_actor = "{}-{}".format( - tf_triggering_env, running_platform + tf_triggering_env, kwargs["running_platform"] ) dso = "redis-server" profilers_artifacts_matrix = [] collection_summary_str = "" - if profilers_enabled: + if kwargs["profilers_enabled"]: collection_summary_str = ( local_profilers_platform_checks( dso, @@ -542,12 +483,15 @@ def process_self_contained_coordinator_stream( ) restore_build_artifacts_from_test_details( - build_artifacts, conn, temporary_dir, testDetails + build_artifacts, + kwargs["conn"], + temporary_dir, + testDetails, ) mnt_point = "/mnt/redis/" command = generate_standalone_redis_server_args( "{}redis-server".format(mnt_point), - redis_proc_start_port, + kwargs["redis_proc_start_port"], mnt_point, redis_configuration_parameters, ) @@ -560,7 +504,10 @@ def process_self_contained_coordinator_stream( run_image, db_cpuset_cpus, command_str ) ) - redis_container = docker_client.containers.run( + + redis_container = kwargs[ + "docker_client" + ].containers.run( image=run_image, volumes={ temporary_dir: { @@ -579,13 +526,16 @@ def process_self_contained_coordinator_stream( ) redis_containers.append(redis_container) - r = redis.StrictRedis(port=redis_proc_start_port) + r = redis.StrictRedis( + port=kwargs["redis_proc_start_port"] + ) r.ping() redis_conns = [r] reset_commandstats(redis_conns) redis_pids = [] first_redis_pid = r.info()["process_id"] redis_pids.append(first_redis_pid) + ceil_client_cpu_limit = extract_client_cpu_limit( benchmark_config ) @@ -603,9 +553,9 @@ def process_self_contained_coordinator_stream( benchmark_config, benchmark_tool_workdir, client_cpuset_cpus, - docker_client, + kwargs["docker_client"], git_hash, - redis_proc_start_port, + kwargs["redis_proc_start_port"], temporary_dir, test_name, ) @@ -636,6 +586,7 @@ def process_self_contained_coordinator_stream( "oss-standalone", ) ) + logging.info( "Will store benchmark json output to local file {}".format( local_benchmark_output_filename @@ -649,7 +600,7 @@ def process_self_contained_coordinator_stream( ) = prepare_benchmark_parameters( benchmark_config, full_benchmark_path, - redis_proc_start_port, + kwargs["redis_proc_start_port"], "localhost", local_benchmark_output_filename, False, @@ -663,7 +614,7 @@ def process_self_contained_coordinator_stream( ) = prepare_memtier_benchmark_parameters( benchmark_config["clientconfig"], full_benchmark_path, - redis_proc_start_port, + kwargs["redis_proc_start_port"], "localhost", local_benchmark_output_filename, benchmark_tool_workdir, @@ -672,6 +623,7 @@ def process_self_contained_coordinator_stream( client_container_image = extract_client_container_image( benchmark_config ) + profiler_call_graph_mode = "dwarf" profiler_frequency = 99 # start the profile @@ -679,8 +631,8 @@ def process_self_contained_coordinator_stream( profiler_name, profilers_map, ) = profilers_start_if_required( - profilers_enabled, - profilers_list, + kwargs["profilers_enabled"], + kwargs["profilers_list"], redis_pids, setup_name, start_time_str, @@ -699,7 +651,9 @@ def process_self_contained_coordinator_stream( # run the benchmark benchmark_start_time = datetime.datetime.now() - client_container_stdout = docker_client.containers.run( + client_container_stdout = kwargs[ + "docker_client" + ].containers.run( image=client_container_image, volumes={ temporary_dir_client: { @@ -730,7 +684,7 @@ def process_self_contained_coordinator_stream( _, overall_tabular_data_map, ) = profilers_stop_if_required( - datasink_push_results_redistimeseries, + kwargs["datasink_push_results_redistimeseries"], benchmark_duration_seconds, collection_summary_str, dso, @@ -738,15 +692,15 @@ def process_self_contained_coordinator_stream( tf_github_repo, profiler_name, profilers_artifacts_matrix, - profilers_enabled, + kwargs["profilers_enabled"], profilers_map, redis_pids, S3_BUCKET_NAME, test_name, ) if ( - profilers_enabled - and datasink_push_results_redistimeseries + kwargs["profilers_enabled"] + and kwargs["datasink_push_results_redistimeseries"] ): datasink_profile_tabular_data( git_branch, @@ -754,7 +708,7 @@ def process_self_contained_coordinator_stream( tf_github_repo, git_hash, overall_tabular_data_map, - conn, + kwargs["conn"], setup_name, start_time_ms, start_time_str, @@ -776,21 +730,21 @@ def process_self_contained_coordinator_stream( "s3_link": s3_link, } ) - https_link = ( - generate_artifacts_table_grafana_redis( - datasink_push_results_redistimeseries, - grafana_profile_dashboard, - profilers_artifacts, - datasink_conn, - setup_name, - start_time_ms, - start_time_str, - test_name, - tf_github_org, - tf_github_repo, - git_hash, - git_branch, - ) + https_link = generate_artifacts_table_grafana_redis( + kwargs[ + "datasink_push_results_redistimeseries" + ], + kwargs["grafana_profile_dashboard"], + profilers_artifacts, + kwargs["datasync_conn"], + setup_name, + start_time_ms, + start_time_str, + test_name, + tf_github_org, + tf_github_repo, + git_hash, + git_branch, ) profiler_dashboard_links.append( [ @@ -863,38 +817,51 @@ def process_self_contained_coordinator_stream( results_dict = json.load(json_file) print_results_table_stdout( benchmark_config, - default_metrics, + kwargs["default_metrics"], results_dict, setup_type, test_name, None, ) + logging.info( + "Done reading results json from {}".format( + full_result_path + ) + ) + dataset_load_duration_seconds = 0 - exporter_datasink_common( - benchmark_config, - benchmark_duration_seconds, - build_variant_name, - datapoint_time_ms, - dataset_load_duration_seconds, - datasink_conn, - datasink_push_results_redistimeseries, - git_branch, - git_version, - metadata, - redis_conns, - results_dict, - running_platform, - setup_name, - setup_type, - test_name, - tf_github_org, - tf_github_repo, - tf_triggering_env, - topology_spec_name, - default_metrics, - ) + if ( + kwargs["datasink_push_results_redistimeseries"] + is True + ): + exporter_datasink_common( + benchmark_config, + benchmark_duration_seconds, + build_variant_name, + datapoint_time_ms, + dataset_load_duration_seconds, + kwargs["datasync_conn"], + kwargs["datasink_push_results_redistimeseries"], + git_branch, + git_version, + metadata, + redis_conns, + results_dict, + kwargs["running_platform"], + setup_name, + setup_type, + test_name, + tf_github_org, + tf_github_repo, + tf_triggering_env, + topology_spec_name, + kwargs["default_metrics"], + ) + + logging.info("shutting down redis server") + r.shutdown(save=False) test_result = True total_test_suite_runs = total_test_suite_runs + 1 @@ -956,7 +923,7 @@ def process_self_contained_coordinator_stream( else: logging.info( "skipping stream_id {} given arch {}!={}".format( - stream_id, run_arch, arch + stream_id, run_arch, kwargs["arch"] ) ) else: diff --git a/utils/tests/test_self_contained_coordinator.py b/utils/tests/test_self_contained_coordinator.py index 9f79198..da10b17 100644 --- a/utils/tests/test_self_contained_coordinator.py +++ b/utils/tests/test_self_contained_coordinator.py @@ -91,12 +91,18 @@ def test_generate_cpuset_cpus(): def test_self_contained_coordinator_blocking_read(): try: + kwargs = {} + kwargs[ + "datasink_push_results_redistimeseries" + ] = args.datasink_push_results_redistimeseries + run_coordinator = True TST_RUNNER_X = os.getenv("TST_RUNNER_X", "1") if TST_RUNNER_X == "0": run_coordinator = False if run_coordinator: conn = redis.StrictRedis(port=16379) + kwargs["conn"] = conn conn.ping() expected_datapoint_ts = None conn.flushall() @@ -110,40 +116,30 @@ def test_self_contained_coordinator_blocking_read(): assert conn.exists(STREAM_KEYNAME_NEW_BUILD_EVENTS) assert conn.xlen(STREAM_KEYNAME_NEW_BUILD_EVENTS) > 0 - running_platform = "fco-ThinkPad-T490" build_runners_consumer_group_create(conn, running_platform, "0") datasink_conn = redis.StrictRedis(port=16379) + kwargs["datasync_conn"] = datasync_conn rts = datasink_conn.ts() - docker_client = docker.from_env() - home = str(Path.home()) - stream_id = ">" - topologies_map = get_topologies( + kwargs["docker_client"] = docker.from_env() + kwargs["home"] = str(Path.home()) + kwargs["stream_id"] = ">" + kwargs["topologies_map"] = get_topologies( "./redis_benchmarks_specification/setups/topologies/topologies.yml" ) # we use a benchmark spec with smaller CPU limit for client given github machines only contain 2 cores # and we need 1 core for DB and another for CLIENT - testsuite_spec_files = [ + kwargs["testsuite_spec_files"] = [ "./utils/tests/test_data/test-suites/redis-benchmark-full-suite-1Mkeys-100B.yml" ] + kwargs["running_platform"] = "fco-ThinkPad-T490" + kwargs["profilers_enabled"] = False ( result, stream_id, number_processed_streams, _, - ) = self_contained_coordinator_blocking_read( - conn, - True, - docker_client, - home, - stream_id, - datasink_conn, - testsuite_spec_files, - topologies_map, - running_platform, - False, - [], - ) + ) = self_contained_coordinator_blocking_read(kwargs) assert result == True assert number_processed_streams == 1 # ensure we're able to aknowledge the consumed message diff --git a/utils/tests/test_self_contained_coordinator_memtier.py b/utils/tests/test_self_contained_coordinator_memtier.py index 30e346a..d4d5f4e 100644 --- a/utils/tests/test_self_contained_coordinator_memtier.py +++ b/utils/tests/test_self_contained_coordinator_memtier.py @@ -23,6 +23,7 @@ ) from redis_benchmarks_specification.__self_contained_coordinator__.runners import ( build_runners_consumer_group_create, + get_runners_consumer_group_name, ) from redis_benchmarks_specification.__setups__.topologies import get_topologies from utils.tests.test_data.api_builder_common import flow_1_and_2_api_builder_checks @@ -30,6 +31,7 @@ def test_self_contained_coordinator_blocking_read(): try: + kwargs = {} run_coordinator = True TST_RUNNER_X = os.getenv("TST_RUNNER_X", "1") if TST_RUNNER_X == "0": @@ -37,6 +39,7 @@ def test_self_contained_coordinator_blocking_read(): if run_coordinator: conn = redis.StrictRedis(port=16379) conn.ping() + kwargs["conn"] = conn expected_datapoint_ts = None conn.flushall() build_variant_name, reply_fields = flow_1_and_2_api_builder_checks(conn) @@ -47,39 +50,35 @@ def test_self_contained_coordinator_blocking_read(): assert conn.exists(STREAM_KEYNAME_NEW_BUILD_EVENTS) assert conn.xlen(STREAM_KEYNAME_NEW_BUILD_EVENTS) > 0 - running_platform = "fco-ThinkPad-T490" + kwargs["running_platform"] = "fco-ThinkPad-T490" - build_runners_consumer_group_create(conn, running_platform, "0") - datasink_conn = redis.StrictRedis(port=16379) - docker_client = docker.from_env() - home = str(Path.home()) - stream_id = ">" - topologies_map = get_topologies( + build_runners_consumer_group_create(conn, kwargs["running_platform"], "0") + kwargs["datasink_conn"] = redis.StrictRedis(port=16379) + kwargs["docker_client"] = docker.from_env() + kwargs["home"] = str(Path.home()) + kwargs["stream_id"] = ">" + kwargs["topologies_map"] = get_topologies( "./redis_benchmarks_specification/setups/topologies/topologies.yml" ) # we use a benchmark spec with smaller CPU limit for client given github machines only contain 2 cores # and we need 1 core for DB and another for CLIENT - testsuite_spec_files = [ + kwargs["testsuite_spec_files"] = [ "./utils/tests/test_data/test-suites/memtier_benchmark-1Mkeys-100B-expire-use-case.yml" ] + kwargs["datasink_push_results_redistimeseries"] = True + kwargs["profilers_enabled"] = False + kwargs["consumer_pos"] = 1 + kwargs["consumer_name"] = "{}-self-contained-proc#{}".format( + get_runners_consumer_group_name(kwargs["running_platform"]), + kwargs["consumer_pos"], + ) + ( result, stream_id, number_processed_streams, _, - ) = self_contained_coordinator_blocking_read( - conn, - True, - docker_client, - home, - stream_id, - datasink_conn, - testsuite_spec_files, - topologies_map, - running_platform, - False, - [], - ) + ) = self_contained_coordinator_blocking_read(kwargs) assert result == True assert number_processed_streams == 1 tf_github_org = "redis"