From 399b7ae2d505362950fcb252fa6fcb4fd49a9817 Mon Sep 17 00:00:00 2001 From: Amit Pawar <158334735+amitpawar12@users.noreply.github.com> Date: Mon, 13 Jan 2025 22:38:11 -0500 Subject: [PATCH] [Snappi] Infra changes for new PFC-ECN testcases. (#13864) Description of PR 67989d1312b1778681d6575b12b66aa42fdf05a7 Please review the commit-ID given above. Original PR13655 was raised to add the new testcases. However, manage the changes efficiently, it was decided to split the original into three PRs for ease in review process. This PR tracks are the infrastructure related changes required for the execution of the testcases. Note - PR #13848 needs to be merged in first before this PR is merged. Summary: Fixes # (issue) #13655 #13215 Type of change Bug fix Testbed and Framework(new/improvement) Test case(new/improvement) Back port request 202012 202205 202305 202311 202405 Approach What is the motivation for this PR? This PR tracks only the infrastructure related changes needed for addition of the new testcases. How did you do it? Important changes are listed below: Change directory - tests/common/snappi_tests/ Additional member variable 'base_flow_config_list' is added as list to class 'SnappiTestParams' in snappi_test_params.py file to accommodate for multiple base-flow-configs. Existing functions - generate_test_flows, generate_background_flows, generate_pause_flows are modified to check if the base_flow_config_list exists. If it does, then base_flow_config is assigned snappi_extra_params.base_flow_config_list[flow_index]. Else existing code is used. Existing function - 'verify_egress_queue_frame_count' is modified to check if base_flow_config_list exists. If yes, base_flow_config_list[0] is assigned to dut_port_config, else existing code is used. The testcases calls 'run_traffic_and_collect_stats' function in traffic_generation file to run and gather IXIA+DUT statistics. Statistics are summarized in test_stats dictionary in return. A function has been created to access the IXIA rest_py framework. This will in turn can be used to integrate MACSEC related changes in future. Currently, rest_py is used to generate the imix custom profile if the flag is set in the test_def dictionary (defined and passed by the test). Depending upon the test_duration and test_interval defined in test_def of the test, the test-case will be executed. At every test_interval, the statistics from IXIA and DUT are pulled in form of dictionary, where date-timestamp is primary key. Important parameters from IXIA like Tx and Rx throughput, number of packets, latency etc are captured with each interval. From DUT side, the Rx and Tx packets, loss packets (combination of failures, drops and errors), PFC count, queue counts are captured. Additional functions like - get_pfc_count, get_ingerface_stats etc are defined in the common/snappi_test helper files to assist with the same. The support for the above is added as part of the different pull-request. At the end of the test, a CSV is created as raw data for the test-case execution. Summary of the test-case is generated in form of text file with same name. The run_sys_test also returns a dictionary test_stats with all the important parameters to be used for the verification of the test. How did you verify/test it? Test was executed on the local clone. Any platform specific information? These testcases are specifically meant for Broadcom-DNX multi-ASIC based platforms. co-authorized by: jianquanye@microsoft.com --- .../common/snappi_tests/snappi_test_params.py | 2 + .../common/snappi_tests/traffic_generation.py | 567 +++++++++++++++++- 2 files changed, 557 insertions(+), 12 deletions(-) diff --git a/tests/common/snappi_tests/snappi_test_params.py b/tests/common/snappi_tests/snappi_test_params.py index 31b4fbc6bf9..b2c0ccd3b6e 100644 --- a/tests/common/snappi_tests/snappi_test_params.py +++ b/tests/common/snappi_tests/snappi_test_params.py @@ -23,6 +23,7 @@ def __init__(self): is_snappi_ingress_port_cap (bool): whether or not the packet capture is on the tgen ingress port, if False, then pcap is on the tgen egress port base_flow_config (dict): base flow configuration + base_flow_config_list (list): list for base flow config. test_tx_frames (list): number of test frames transmitted for priorities to test ex. [2000, 3000] for priorities 3 and 4 multi_dut_params (MultiDUTParams obj): contains det=120ails of duthost objects, @@ -49,6 +50,7 @@ def __init__(self): self.packet_capture_ports = None self.is_snappi_ingress_port_cap = True self.base_flow_config = None + self.base_flow_config_list = [] self.test_tx_frames = 0 self.multi_dut_params = MultiDUTParams() self.test_iterations = 1 diff --git a/tests/common/snappi_tests/traffic_generation.py b/tests/common/snappi_tests/traffic_generation.py index c553cc7bfc0..be710cd502b 100644 --- a/tests/common/snappi_tests/traffic_generation.py +++ b/tests/common/snappi_tests/traffic_generation.py @@ -4,16 +4,23 @@ import time import logging import re +import pandas as pd +from datetime import datetime + from tests.common.helpers.assertions import pytest_assert from tests.common.snappi_tests.common_helpers import get_egress_queue_count, pfc_class_enable_vector, \ get_lossless_buffer_size, get_pg_dropped_packets, \ sec_to_nanosec, get_pfc_frame_count, packet_capture, get_tx_frame_count, get_rx_frame_count, \ - traffic_flow_mode + traffic_flow_mode, get_pfc_count, clear_counters, get_interface_stats, get_queue_count_all_prio from tests.common.snappi_tests.port import select_ports, select_tx_port from tests.common.snappi_tests.snappi_helpers import wait_for_arp, fetch_snappi_flow_metrics from tests.snappi_tests.variables import pfcQueueGroupSize, pfcQueueValueDict from tests.common.cisco_data import is_cisco_device +# Imported to support rest_py in ixnetwork +from ixnetwork_restpy.assistants.statistics.statviewassistant import StatViewAssistant + + logger = logging.getLogger(__name__) SNAPPI_POLL_DELAY_SEC = 2 @@ -97,7 +104,8 @@ def generate_test_flows(testbed_config, test_flow_prio_list, prio_dscp_map, snappi_extra_params, - number_of_streams=1): + number_of_streams=1, + flow_index=None): """ Generate configurations of test flows. Test flows and background flows are also known as data flows. @@ -107,8 +115,15 @@ def generate_test_flows(testbed_config, prio_dscp_map (dict): priority to DSCP mapping snappi_extra_params (SnappiTestParams obj): additional parameters for Snappi traffic number_of_streams (int): number of UDP streams + flow_index (int): Index to identify the base_flow_config. Default is None. """ - base_flow_config = snappi_extra_params.base_flow_config + # If snappi_extra_params.base_flow_config_list exists, + # assign it to base_flow_config using flow_index. + if not snappi_extra_params.base_flow_config_list: + base_flow_config = snappi_extra_params.base_flow_config + else: + base_flow_config = snappi_extra_params.base_flow_config_list[flow_index] + pytest_assert(base_flow_config is not None, "Cannot find base flow configuration") data_flow_config = snappi_extra_params.traffic_flow_config.data_flow_config pytest_assert(data_flow_config is not None, "Cannot find data flow configuration") @@ -123,7 +138,11 @@ def generate_test_flows(testbed_config, } for prio in test_flow_prio_list: - test_flow_name = "{} Prio {}".format(data_flow_config["flow_name"], prio) + # If flow_index exists, then flow name uses it to identify Stream-name. + if flow_index is None: + test_flow_name = "{} Prio {}".format(data_flow_config["flow_name"], prio) + else: + test_flow_name = "{} Prio {} Stream {}".format(data_flow_config["flow_name"], prio, flow_index) test_flow = testbed_config.flows.flow(name=test_flow_name)[-1] test_flow.tx_rx.port.tx_name = base_flow_config["tx_port_name"] test_flow.tx_rx.port.rx_name = base_flow_config["rx_port_name"] @@ -177,13 +196,18 @@ def generate_test_flows(testbed_config, base_flow_config["test_flow_name_dut_rx_port_map"] = test_flow_name_dut_rx_port_map base_flow_config["test_flow_name_dut_tx_port_map"] = test_flow_name_dut_tx_port_map - snappi_extra_params.base_flow_config = base_flow_config + # If base_flow_config_list, exists, re-assign updated base_flow_config to it using flow_index. + if not snappi_extra_params.base_flow_config_list: + snappi_extra_params.base_flow_config = base_flow_config + else: + snappi_extra_params.base_flow_config_list[flow_index] = base_flow_config def generate_background_flows(testbed_config, bg_flow_prio_list, prio_dscp_map, - snappi_extra_params): + snappi_extra_params, + flow_index=None): """ Generate background configurations of flows. Test flows and background flows are also known as data flows. @@ -192,14 +216,27 @@ def generate_background_flows(testbed_config, bg_flow_prio_list (list): list of background flow priorities prio_dscp_map (dict): priority to DSCP mapping snappi_extra_params (SnappiTestParams obj): additional parameters for Snappi traffic + flow_index (int): Index to identify the base_flow_config. Default is None. """ - base_flow_config = snappi_extra_params.base_flow_config + # If snappi_extra_params.base_flow_config_list exists, + # assign it to base_flow_config using flow_index. + if not snappi_extra_params.base_flow_config_list: + base_flow_config = snappi_extra_params.base_flow_config + else: + base_flow_config = snappi_extra_params.base_flow_config_list[flow_index] + pytest_assert(base_flow_config is not None, "Cannot find base flow configuration") bg_flow_config = snappi_extra_params.traffic_flow_config.background_flow_config pytest_assert(bg_flow_config is not None, "Cannot find background flow configuration") for prio in bg_flow_prio_list: - bg_flow = testbed_config.flows.flow(name='{} Prio {}'.format(bg_flow_config["flow_name"], prio))[-1] + # If flow_index exists, then flow name uses it to identify Stream-name. + if flow_index is None: + bg_flow = testbed_config.flows.flow(name='{} Prio {}'.format(bg_flow_config["flow_name"], prio))[-1] + else: + bg_flow = testbed_config.flows.flow(name='{} Prio {} Stream {}'. + format(bg_flow_config["flow_name"], prio, flow_index))[-1] + bg_flow.tx_rx.port.tx_name = base_flow_config["tx_port_name"] bg_flow.tx_rx.port.rx_name = base_flow_config["rx_port_name"] @@ -231,7 +268,8 @@ def generate_background_flows(testbed_config, def generate_pause_flows(testbed_config, pause_prio_list, global_pause, - snappi_extra_params): + snappi_extra_params, + flow_index=None): """ Generate configurations of pause flows. @@ -240,13 +278,26 @@ def generate_pause_flows(testbed_config, pause_prio_list (list): list of pause priorities global_pause (bool): global pause or per priority pause snappi_extra_params (SnappiTestParams obj): additional parameters for Snappi traffic + flow_index (int): Index to identify the base_flow_config. Default is None. """ - base_flow_config = snappi_extra_params.base_flow_config + # If snappi_extra_params.base_flow_config_list exists, + # assign it to base_flow_config using flow_index. + if not snappi_extra_params.base_flow_config_list: + base_flow_config = snappi_extra_params.base_flow_config + else: + base_flow_config = snappi_extra_params.base_flow_config_list[flow_index] + pytest_assert(base_flow_config is not None, "Cannot find base flow configuration") pause_flow_config = snappi_extra_params.traffic_flow_config.pause_flow_config pytest_assert(pause_flow_config is not None, "Cannot find pause flow configuration") - pause_flow = testbed_config.flows.flow(name=pause_flow_config["flow_name"])[-1] + # If flow_index exists, then flow name uses it to identify Stream-name. + if flow_index is None: + pause_flow = testbed_config.flows.flow(name=pause_flow_config["flow_name"])[-1] + else: + pause_flow = testbed_config.flows.flow(name='{} Stream {}'. + format(pause_flow_config["flow_name"], flow_index))[-1] + pause_flow.tx_rx.port.tx_name = testbed_config.ports[base_flow_config["rx_port_id"]].name pause_flow.tx_rx.port.rx_name = testbed_config.ports[base_flow_config["tx_port_id"]].name @@ -793,7 +844,13 @@ def verify_egress_queue_frame_count(duthost, Returns: """ - dut_port_config = snappi_extra_params.base_flow_config["dut_port_config"] + # If snappi_extra_params.base_flow_config_list exists, + # assign base_flow_config[0]["dut_port_config"] to dut_port_config. + if not snappi_extra_params.base_flow_config_list: + dut_port_config = snappi_extra_params.base_flow_config["dut_port_config"] + else: + dut_port_config = snappi_extra_params.base_flow_config_list[0]["dut_port_config"] + pytest_assert(dut_port_config is not None, 'Flow port config is not provided') set_class_enable_vec = snappi_extra_params.set_pfc_class_enable_vec test_tx_frames = snappi_extra_params.test_tx_frames @@ -814,3 +871,489 @@ def verify_egress_queue_frame_count(duthost, total_egress_packets, _ = get_egress_queue_count(duthost, peer_port, prios[prio]) pytest_assert(total_egress_packets == test_tx_frames[prio], "Queue counters should increment for invalid PFC pause frames") + + +def tgen_curr_stats(traf_metrics, flow_metrics, data_flow_names): + """ + Print the current tgen metrics + + Arg: + traf_metrics (obj): Current traffic item stats on IXIA. + curr_flow_metrics (obj): Current tgen stats. + snappi_extra_params (SnappiTestParams obj): additional parameters for Snappi traffic. + Returns: + stats (dictionary): Dictionary of DUTs statistics + + """ + stats = {} + for metric in traf_metrics: + if metric['Traffic Item'] not in data_flow_names: + continue + rx_rate_gbps = 0 + tx_rate_gbps = 0 + if (int(metric['Rx Rate (Mbps)'] != 0)): + rx_rate_gbps = round(float(metric['Rx Rate (Mbps)'])*1024/(10**6), 2) + if (int(metric['Tx Rate (Mbps)'] != 0)): + tx_rate_gbps = round(float(metric['Tx Rate (Mbps)'])*1024/(10**6), 2) + + metric_name = metric['Traffic Item'].replace(' ', '_').lower() + stats[metric_name+'_txrate_fps'] = float(metric['Tx Frame Rate']) + stats[metric_name+'_txrate_Gbps'] = tx_rate_gbps + stats[metric_name+'_rxrate_fps'] = float(metric['Rx Frame Rate']) + stats[metric_name+'_rxrate_Gbps'] = rx_rate_gbps + stats[metric_name+'_LI_rxrate_Gbps'] = float(metric['Rx L1 Rate (Gbps)']) + + for metric in flow_metrics: + if metric.name not in data_flow_names: + continue + metric_name = metric.name.replace(' ', '_').lower() + stats[metric_name+'_rx_pkts'] = metric.frames_rx + stats[metric_name+'_tx_pkts'] = metric.frames_tx + stats[metric_name+'_loss'] = metric.loss + stats[metric_name+'_avg_latency_ns'] = metric.latency.average_ns + stats[metric_name+'_max_latency_ns'] = metric.latency.maximum_ns + stats[metric_name+'_min_latency_ns'] = metric.latency.minimum_ns + return stats + + +def run_traffic_and_collect_stats(rx_duthost, + tx_duthost, + api, + config, + data_flow_names, + all_flow_names, + exp_dur_sec, + port_map, + fname, + stats_interval, + imix, + snappi_extra_params): + + """ + Run traffic and return per-flow statistics, and capture packets if needed. + Args: + rx_duthost (obj): Traffic receiving DUT host object - Ingress DUT + tx_duthost (obj): Traffic transmitting DUT host object - Egress DUT + api (obj): snappi session + config (obj): experiment config (testbed config + flow config) + data_flow_names (list): list of names of data (test and background flows + all_flow_names (list): list of names of all the flows + exp_dur_sec (int): experiment duration in second + snappi_extra_params (SnappiTestParams obj): additional parameters for Snappi traffic + Returns: + per-flow statistics (list) + """ + # Returns true if any value in prio_list contains the flow_name. + def check_presence(flow_name, prio_list): + return any(m in flow_name for m in prio_list) + + # Returns true if string is absent in all prio_list. + def check_absence(flow_name, prio_list): + return all(m not in flow_name for m in prio_list) + + api.set_config(config) + + logger.info("Wait for Arp to Resolve ...") + wait_for_arp(api, max_attempts=30, poll_interval_sec=2) + + pcap_type = snappi_extra_params.packet_capture_type + + dutport_list = [] + + for m in snappi_extra_params.multi_dut_params.multi_dut_ports: + if m['peer_device'] == snappi_extra_params.multi_dut_params.duthost1.hostname: + dutport_list.append([snappi_extra_params.multi_dut_params.duthost1, m['peer_port']]) + else: + dutport_list.append([snappi_extra_params.multi_dut_params.duthost2, m['peer_port']]) + + switch_tx_lossless_prios = sum(snappi_extra_params.base_flow_config_list[0]["dut_port_config"][1].values(), []) + # Generating list with lossless priorities starting with keyword 'prio_' + prio_list = ['prio_{}'.format(num) for num in switch_tx_lossless_prios] + + # Clearing stats before starting the test + # PFC, counters, queue-counters and dropcounters + # logger.debug('Clearing PFC, dropcounters, queuecounters and stats') + for dut, port in dutport_list: + clear_counters(dut, port) + + if pcap_type != packet_capture.NO_CAPTURE: + logger.info("Starting packet capture ...") + cs = api.capture_state() + cs.port_names = snappi_extra_params.packet_capture_ports + cs.state = cs.START + api.set_capture_state(cs) + + # Returns the rest API object for features not present in Snappi + ixnet_rest_api = api._ixnetwork + + # If imix flag is set, IMIX packet-profile is enabled. + if (imix): + logger.info('Test packet-profile setting to IMIX') + for traff_item in ixnet_rest_api.Traffic.TrafficItem.find(): + config_ele = traff_item.ConfigElement.find()[0].FrameSize + config_ele.PresetDistribution = "imix" + config_ele.Type = "weightedPairs" + config_ele.WeightedPairs = ["128", "7", "570", "4", "1518", "1"] + + ixnet_rest_api.Traffic.TrafficItem.find().Generate() + ixnet_rest_api.Traffic.Apply() + + logger.info("Starting transmit on all flows ...") + ts = api.transmit_state() + ts.state = ts.START + api.set_transmit_state(ts) + + time.sleep(5) + iter_count = round((int(exp_dur_sec) - stats_interval)/stats_interval) + + f_stats = {} + logger.info('Polling DUT and tool for traffic statistics for {} iterations and {} seconds'. + format(iter_count, exp_dur_sec)) + switch_device_results = {} + switch_device_results["tx_frames"] = {} + switch_device_results["rx_frames"] = {} + for lossless_prio in switch_tx_lossless_prios: + switch_device_results["tx_frames"][lossless_prio] = [] + switch_device_results["rx_frames"][lossless_prio] = [] + + exp_dur_sec = exp_dur_sec + ANSIBLE_POLL_DELAY_SEC + + for m in range(int(iter_count)): + now = datetime.now() + logger.info('----------- Collecting Stats for Iteration : {} ------------'.format(m+1)) + f_stats[m] = {'Date': datetime.now().strftime('%Y-%m-%d-%H-%M-%S')} + flow_metrics = fetch_snappi_flow_metrics(api, data_flow_names) + traf_metrics = StatViewAssistant(ixnet_rest_api, 'Traffic Item Statistics').Rows + tx_frame = sum([metric.frames_tx for metric in flow_metrics if metric.name in data_flow_names]) + f_stats[m]['tgen_tx_frames'] = tx_frame + rx_frame = sum([metric.frames_rx for metric in flow_metrics if metric.name in data_flow_names]) + f_stats[m]['tgen_rx_frames'] = rx_frame + f_stats = update_dict(m, f_stats, tgen_curr_stats(traf_metrics, flow_metrics, data_flow_names)) + for dut, port in dutport_list: + f_stats = update_dict(m, f_stats, flatten_dict(get_interface_stats(dut, port))) + f_stats = update_dict(m, f_stats, flatten_dict(get_pfc_count(dut, port))) + f_stats = update_dict(m, f_stats, flatten_dict(get_queue_count_all_prio(dut, port))) + + logger.info("Polling DUT for Egress Queue statistics") + + for lossless_prio in switch_tx_lossless_prios: + count_frames = 0 + for n in range(port_map[0]): + dut, port = dutport_list[n] + count_frames = count_frames + (get_egress_queue_count(dut, port, lossless_prio)[0]) + logger.info( + 'Egress Queue Count for DUT:{}, Port:{}, Priority:{} - {}'.format( + dut.hostname, port, lossless_prio, count_frames + ) + ) + switch_device_results["tx_frames"][lossless_prio].append(count_frames) + count_frames = 0 + for n in range(port_map[2]): + dut, port = dutport_list[-(n+1)] + count_frames = count_frames + (get_egress_queue_count(dut, port, lossless_prio)[0]) + switch_device_results["rx_frames"][lossless_prio].append(count_frames) + later = datetime.now() + time.sleep(abs(round(stats_interval - ((later - now).total_seconds())))) + logger.info('------------------------------------------------------------') + + attempts = 0 + max_attempts = 10 + + while attempts < max_attempts: + logger.info("Checking if all flows have stopped. Attempt #{}".format(attempts + 1)) + flow_metrics = fetch_snappi_flow_metrics(api, data_flow_names) + + # If all the data flows have stopped + transmit_states = [metric.transmit for metric in flow_metrics] + if len(flow_metrics) == len(data_flow_names) and\ + list(set(transmit_states)) == ['stopped']: + logger.info("All test and background traffic flows stopped") + time.sleep(SNAPPI_POLL_DELAY_SEC) + break + else: + if (attempts == 4): + logger.info("Stopping transmit on all remaining flows") + ts = api.transmit_state() + ts.state = ts.STOP + api.set_transmit_state(ts) + time.sleep(stats_interval/4) + attempts += 1 + + pytest_assert(attempts < max_attempts, + "Flows do not stop in {} seconds".format(max_attempts*stats_interval)) + + if pcap_type != packet_capture.NO_CAPTURE: + logger.info("Stopping packet capture ...") + request = api.capture_request() + request.port_name = snappi_extra_params.packet_capture_ports[0] + cs = api.capture_state() + cs.state = cs.STOP + api.set_capture_state(cs) + logger.info("Retrieving and saving packet capture to {}.pcapng".format(snappi_extra_params.packet_capture_file)) + pcap_bytes = api.get_capture(request) + with open(snappi_extra_params.packet_capture_file + ".pcapng", 'wb') as fid: + fid.write(pcap_bytes.getvalue()) + + time.sleep(5) + # Counting egress queue frames at the end of the test. + for lossless_prio in switch_tx_lossless_prios: + count_frames = 0 + for n in range(port_map[0]): + dut, port = dutport_list[n] + count_frames = count_frames + (get_egress_queue_count(dut, port, lossless_prio)[0]) + logger.info( + 'Final egress Queue Count for DUT:{},Port:{}, Priority:{} - {}'.format( + dut.hostname, port, lossless_prio, count_frames + ) + ) + switch_device_results["tx_frames"][lossless_prio].append(count_frames) + count_frames = 0 + for n in range(port_map[2]): + dut, port = dutport_list[-(n+1)] + count_frames = count_frames + (get_egress_queue_count(dut, port, lossless_prio)[0]) + switch_device_results["rx_frames"][lossless_prio].append(count_frames) + + # Dump per-flow statistics for final rows + logger.info("Dumping per-flow statistics for final row") + m = iter_count + f_stats[m] = {'Date': datetime.now().strftime('%Y-%m-%d-%H-%M-%S')} + flow_metrics = fetch_snappi_flow_metrics(api, data_flow_names) + traf_metrics = StatViewAssistant(ixnet_rest_api, 'Traffic Item Statistics').Rows + tx_frame = sum([metric.frames_tx for metric in flow_metrics if metric.name in data_flow_names]) + rx_frame = sum([metric.frames_rx for metric in flow_metrics if metric.name in data_flow_names]) + f_stats[m]['tgen_tx_frames'] = tx_frame + f_stats[m]['tgen_rx_frames'] = rx_frame + f_stats = update_dict(m, f_stats, tgen_curr_stats(traf_metrics, flow_metrics, data_flow_names)) + for dut, port in dutport_list: + f_stats = update_dict(m, f_stats, flatten_dict(get_interface_stats(dut, port))) + f_stats = update_dict(m, f_stats, flatten_dict(get_pfc_count(dut, port))) + f_stats = update_dict(m, f_stats, flatten_dict(get_queue_count_all_prio(dut, port))) + + flow_metrics = fetch_snappi_flow_metrics(api, all_flow_names) + time.sleep(10) + + df = pd.DataFrame(f_stats) + df_t = df.T + df_t = df_t.reindex(sorted(df_t.columns), axis=1) + flow_list = [] + all_flow_names = [flow.name for flow in config.flows] + for item in all_flow_names: + flow_list.append(item.replace(' ', '_').lower()) + results = list(df_t.columns) + fname = fname + '-' + datetime.now().strftime('%Y-%m-%d-%H-%M') + with open(fname+'.txt', 'w') as f: + f.write('Captured data for {} iterations at {} seconds interval \n'.format(m, stats_interval)) + test_stats = {} + test_stats['tgen_loss_pkts'] = 0 + test_stats['tgen_lossy_rx_pkts'] = 0 + test_stats['tgen_lossy_tx_pkts'] = 0 + test_stats['tgen_lossless_rx_pkts'] = 0 + test_stats['tgen_lossless_tx_pkts'] = 0 + test_stats['tgen_rx_rate'] = 0 + test_stats['tgen_tx_rate'] = 0 + for flow in flow_list: + rx_rate = 0 + tx_rate = 0 + for item in results: + if (flow in item and item.split(flow)[1] == '_avg_latency_ns'): + avg_latency = round(df_t[item].mean(), 2) + if (flow in item and item.split(flow)[1] == '_rx_pkts'): + rx_pkts = df_t[item].max() + # Incrementing TGEN lossless priority Rx packets. + if (flow in item and item.split(flow)[1] == '_rx_pkts' and (check_presence(flow, prio_list))): + test_stats['tgen_lossless_rx_pkts'] += rx_pkts + # Incrementing TGEN lossy priority Rx packets. + if (flow in item and item.split(flow)[1] == '_rx_pkts' + and (check_absence(flow, prio_list))): + test_stats['tgen_lossy_rx_pkts'] += rx_pkts + if (flow in item and item.split(flow)[1] == '_tx_pkts'): + tx_pkts = df_t[item].max() + # Incrementing lossless priority Tx packets. + if ((flow in item and item.split(flow)[1] == '_tx_pkts') + and (check_presence(flow, prio_list))): + test_stats['tgen_lossless_tx_pkts'] += tx_pkts + # Incrementing lossy priority Tx packets. + if ((flow in item and item.split(flow)[1] == '_tx_pkts') + and (check_absence(flow, prio_list))): + test_stats['tgen_lossy_tx_pkts'] += tx_pkts + if (flow in item and item.split(flow)[1] == '_rxrate_Gbps'): + if (df_t[item].sum() != 0): + rx_rate = round(df_t.loc[df_t[item] != 0, item].mean(), 2) + else: + rx_rate = 0 + test_stats['tgen_rx_rate'] += round(rx_rate, 2) + if (flow in item and item.split(flow)[1] == '_txrate_Gbps'): + tx_rate = round(df_t.loc[df_t[item] != 0, item].mean(), 2) + test_stats['tgen_tx_rate'] += round(tx_rate, 2) + if (flow in item and item.split(flow)[1] == '_loss'): + loss = df_t[item].max() + if ('pause' not in flow): + test_stats['tgen_loss_pkts'] += (int(tx_pkts) - int(rx_pkts)) + f.write('For {} - Avg_Latency:{}, rx_pkts:{}, tx_pkts:{}, rx_thrput:{}, tx_thrput:{} and loss prcnt:{} \n' + .format(flow, avg_latency, rx_pkts, tx_pkts, rx_rate, tx_rate, loss)) + f.write('Total Lossless Rx pkts:{} and Tx pkts:{} \n'. + format(test_stats['tgen_lossless_rx_pkts'], test_stats['tgen_lossless_tx_pkts'])) + f.write('Total Lossy Rx pkts:{} and Tx pkts:{} \n'. + format(test_stats['tgen_lossy_rx_pkts'], test_stats['tgen_lossy_tx_pkts'])) + f.write('Total TGEN Loss Pkts:{} \n'.format(test_stats['tgen_loss_pkts'])) + # Computing DUT Tx-Rx throughput, packets and failures via interface stats. + test_stats['dut_loss_pkts'] = 0 + for dut, port in dutport_list: + new_key = (dut.hostname + '_' + port).lower() + rx_thrput = 0 + tx_thrput = 0 + for item in results: + if (new_key in item and item.split(new_key)[1] == '_rx_thrput_Mbps'): + rx_thrput = round(df_t.loc[df_t[item] != 0, item].mean(), 2) + if (new_key in item and item.split(new_key)[1] == '_tx_thrput_Mbps'): + tx_thrput = round(df_t.loc[df_t[item] != 0, item].mean(), 2) + if (new_key in item and item.split(new_key)[1] == '_rx_pkts'): + rx_pkts = df_t[item].max() + if (new_key in item and item.split(new_key)[1] == '_tx_pkts'): + tx_pkts = df_t[item].max() + if (new_key in item and item.split(new_key)[1] == '_tx_fail'): + tx_fail = df_t[item].max() + if (new_key in item and item.split(new_key)[1] == '_rx_fail'): + rx_fail = df_t[item].max() + test_stats['dut_loss_pkts'] += (int(tx_fail) + int(rx_fail)) + f.write('For {} - rx_pkts:{}, tx_pkts:{}, rx_thrput:{}, tx_thrput:{} and rx_loss:{}, tx_loss:{} \n'. + format(new_key, rx_pkts, tx_pkts, rx_thrput, tx_thrput, rx_fail, tx_fail)) + f.write('Total DUT Loss Pkts:{} \n'.format(test_stats['dut_loss_pkts'])) + test_stats['dut_lossless_pkts'] = 0 + test_stats['dut_lossy_pkts'] = 0 + prio_key = ['_prio_'] + for dut, port in dutport_list: + new_key = (dut.hostname + '_' + port).lower() + prio_dict = {} + for item in results: + for key in prio_key: + if (new_key in item and key in item.split(new_key)[1]): + prio_dict[item.split(new_key)[1]] = df_t[item].max() + f.write('Egress Queue Count for {} : \n'.format(new_key)) + for key, val in prio_dict.items(): + if val != 0: + # Checking for lossless priorities in the key. + if check_presence(key, prio_list): + test_stats['dut_lossless_pkts'] += val + else: + test_stats['dut_lossy_pkts'] += val + f.write('{}:{} \n'.format(key, val)) + + test_stats['lossless_tx_pfc'] = 0 + test_stats['lossless_rx_pfc'] = 0 + test_stats['lossy_rx_tx_pfc'] = 0 + f.write('Received or Transmitted PFC counts: \n') + tx_pfc_list = ['tx_pfc_{}'.format(num) for num in switch_tx_lossless_prios] + rx_pfc_list = ['rx_pfc_{}'.format(num) for num in switch_tx_lossless_prios] + for item in results: + if ('pfc' in item): + if (df_t[item].max() != 0): + f.write('{} : {} \n'.format(item, df_t[item].max())) + # Lossless priority PFCs transmitted. + if (check_presence(item, tx_pfc_list)): + test_stats['lossless_tx_pfc'] += int(df_t[item].max()) + # Lossless priority PFCs received. + elif (check_presence(item, rx_pfc_list)): + test_stats['lossless_rx_pfc'] += int(df_t[item].max()) + else: + # Lossy PFCs received or transmitted. + test_stats['lossy_rx_tx_pfc'] += int(df_t[item].max()) + + fname = fname + '.csv' + logger.info('Writing statistics to file : {}'.format(fname)) + df_t.to_csv(fname, index=False) + + return flow_metrics, switch_device_results, test_stats + + +def update_dict(m, + orig_dict, + new_dict): + """ + Merges the info from new_dict into orig_dict at index of m + Args: + m (int): index of the orig_dict + orig_dict (dict): original dictionary to be updated + new_dict (dict): dictionary that needs to be fitted in orig_dict + Returns: + orig_dict (dict): Updated orig_dict with new values of new_dict + """ + for key, value in new_dict.items(): + orig_dict[m][key] = value + + return orig_dict + + +def multi_base_traffic_config(testbed_config, + port_config_list, + rx_port_id, + tx_port_id): + """ + Generate base configurations of flows, including test flows, background flows and + pause storm. Test flows and background flows are also known as data flows. + Args: + testbed_config (obj): testbed L1/L2/L3 configuration + port_config_list (list): list of port configuration + rx_port_id (int): Rx ID of DUT port to test + tx_port_id (int): Tx ID of DUT port to test + + Returns: + base_flow_config (dict): base flow configuration containing dut_port_config, tx_mac, + rx_mac, tx_port_config, rx_port_config, tx_port_name, rx_port_name + dict key-value pairs (all keys are strings): + tx_port_id (int): ID of ixia TX port ex. 1 + rx_port_id (int): ID of ixia RX port ex. 2 + tx_port_config (SnappiPortConfig): port config obj for ixia TX port + rx_port_config (SnappiPortConfig): port config obj for ixia RX port + tx_mac (str): MAC address of ixia TX port ex. '00:00:fa:ce:fa:ce' + rx_mac (str): MAC address of ixia RX port ex. '00:00:fa:ce:fa:ce' + tx_port_name (str): name of ixia TX port ex. 'Port 1' + rx_port_name (str): name of ixia RX port ex. 'Port 2' + dut_port_config (list): a list of two dictionaries of tx and rx ports on the peer (switch) side, + and the associated test priorities + ex. [{'Ethernet4':[3, 4]}, {'Ethernet8':[3, 4]}] + test_flow_name_dut_rx_port_map (dict): Mapping of test flow name to DUT RX port(s) + ex. {'flow1': [Ethernet4, Ethernet8]} + test_flow_name_dut_tx_port_map (dict): Mapping of test flow name to DUT TX port(s) + ex. {'flow1': [Ethernet4, Ethernet8]} + """ + base_flow_config = {} + base_flow_config["rx_port_id"] = rx_port_id + base_flow_config["tx_port_id"] = tx_port_id + + tx_port_config = next((x for x in port_config_list if x.id == tx_port_id), None) + rx_port_config = next((x for x in port_config_list if x.id == rx_port_id), None) + base_flow_config["tx_port_config"] = tx_port_config + base_flow_config["rx_port_config"] = rx_port_config + + # Instantiate peer ports in dut_port_config + dut_port_config = [] + tx_dict = {str(tx_port_config.peer_port): []} + rx_dict = {str(rx_port_config.peer_port): []} + dut_port_config.append(tx_dict) + dut_port_config.append(rx_dict) + base_flow_config["dut_port_config"] = dut_port_config + + base_flow_config["tx_mac"] = tx_port_config.mac + if tx_port_config.gateway == rx_port_config.gateway and \ + tx_port_config.prefix_len == rx_port_config.prefix_len: + """ If soruce and destination port are in the same subnet """ + base_flow_config["rx_mac"] = rx_port_config.mac + else: + base_flow_config["rx_mac"] = tx_port_config.gateway_mac + + base_flow_config["tx_port_name"] = testbed_config.ports[tx_port_id].name + base_flow_config["rx_port_name"] = testbed_config.ports[rx_port_id].name + + return base_flow_config + + +def flatten_dict(d, parent_key='', sep='_'): + items = [] + for k, v in d.items(): + new_key = (str(parent_key) + sep + str(k)).lower() if parent_key else k + if isinstance(v, dict): + items.extend(flatten_dict(v, new_key, sep=sep).items()) + else: + items.append((new_key, v)) + return dict(items)