diff --git a/tests/common/snappi_tests/snappi_test_params.py b/tests/common/snappi_tests/snappi_test_params.py index 31b4fbc6bf..b2c0ccd3b6 100644 --- a/tests/common/snappi_tests/snappi_test_params.py +++ b/tests/common/snappi_tests/snappi_test_params.py @@ -23,6 +23,7 @@ def __init__(self): is_snappi_ingress_port_cap (bool): whether or not the packet capture is on the tgen ingress port, if False, then pcap is on the tgen egress port base_flow_config (dict): base flow configuration + base_flow_config_list (list): list for base flow config. test_tx_frames (list): number of test frames transmitted for priorities to test ex. [2000, 3000] for priorities 3 and 4 multi_dut_params (MultiDUTParams obj): contains det=120ails of duthost objects, @@ -49,6 +50,7 @@ def __init__(self): self.packet_capture_ports = None self.is_snappi_ingress_port_cap = True self.base_flow_config = None + self.base_flow_config_list = [] self.test_tx_frames = 0 self.multi_dut_params = MultiDUTParams() self.test_iterations = 1 diff --git a/tests/common/snappi_tests/traffic_generation.py b/tests/common/snappi_tests/traffic_generation.py index c553cc7bfc..be710cd502 100644 --- a/tests/common/snappi_tests/traffic_generation.py +++ b/tests/common/snappi_tests/traffic_generation.py @@ -4,16 +4,23 @@ import time import logging import re +import pandas as pd +from datetime import datetime + from tests.common.helpers.assertions import pytest_assert from tests.common.snappi_tests.common_helpers import get_egress_queue_count, pfc_class_enable_vector, \ get_lossless_buffer_size, get_pg_dropped_packets, \ sec_to_nanosec, get_pfc_frame_count, packet_capture, get_tx_frame_count, get_rx_frame_count, \ - traffic_flow_mode + traffic_flow_mode, get_pfc_count, clear_counters, get_interface_stats, get_queue_count_all_prio from tests.common.snappi_tests.port import select_ports, select_tx_port from tests.common.snappi_tests.snappi_helpers import wait_for_arp, fetch_snappi_flow_metrics from tests.snappi_tests.variables import pfcQueueGroupSize, pfcQueueValueDict from tests.common.cisco_data import is_cisco_device +# Imported to support rest_py in ixnetwork +from ixnetwork_restpy.assistants.statistics.statviewassistant import StatViewAssistant + + logger = logging.getLogger(__name__) SNAPPI_POLL_DELAY_SEC = 2 @@ -97,7 +104,8 @@ def generate_test_flows(testbed_config, test_flow_prio_list, prio_dscp_map, snappi_extra_params, - number_of_streams=1): + number_of_streams=1, + flow_index=None): """ Generate configurations of test flows. Test flows and background flows are also known as data flows. @@ -107,8 +115,15 @@ def generate_test_flows(testbed_config, prio_dscp_map (dict): priority to DSCP mapping snappi_extra_params (SnappiTestParams obj): additional parameters for Snappi traffic number_of_streams (int): number of UDP streams + flow_index (int): Index to identify the base_flow_config. Default is None. """ - base_flow_config = snappi_extra_params.base_flow_config + # If snappi_extra_params.base_flow_config_list exists, + # assign it to base_flow_config using flow_index. + if not snappi_extra_params.base_flow_config_list: + base_flow_config = snappi_extra_params.base_flow_config + else: + base_flow_config = snappi_extra_params.base_flow_config_list[flow_index] + pytest_assert(base_flow_config is not None, "Cannot find base flow configuration") data_flow_config = snappi_extra_params.traffic_flow_config.data_flow_config pytest_assert(data_flow_config is not None, "Cannot find data flow configuration") @@ -123,7 +138,11 @@ def generate_test_flows(testbed_config, } for prio in test_flow_prio_list: - test_flow_name = "{} Prio {}".format(data_flow_config["flow_name"], prio) + # If flow_index exists, then flow name uses it to identify Stream-name. + if flow_index is None: + test_flow_name = "{} Prio {}".format(data_flow_config["flow_name"], prio) + else: + test_flow_name = "{} Prio {} Stream {}".format(data_flow_config["flow_name"], prio, flow_index) test_flow = testbed_config.flows.flow(name=test_flow_name)[-1] test_flow.tx_rx.port.tx_name = base_flow_config["tx_port_name"] test_flow.tx_rx.port.rx_name = base_flow_config["rx_port_name"] @@ -177,13 +196,18 @@ def generate_test_flows(testbed_config, base_flow_config["test_flow_name_dut_rx_port_map"] = test_flow_name_dut_rx_port_map base_flow_config["test_flow_name_dut_tx_port_map"] = test_flow_name_dut_tx_port_map - snappi_extra_params.base_flow_config = base_flow_config + # If base_flow_config_list, exists, re-assign updated base_flow_config to it using flow_index. + if not snappi_extra_params.base_flow_config_list: + snappi_extra_params.base_flow_config = base_flow_config + else: + snappi_extra_params.base_flow_config_list[flow_index] = base_flow_config def generate_background_flows(testbed_config, bg_flow_prio_list, prio_dscp_map, - snappi_extra_params): + snappi_extra_params, + flow_index=None): """ Generate background configurations of flows. Test flows and background flows are also known as data flows. @@ -192,14 +216,27 @@ def generate_background_flows(testbed_config, bg_flow_prio_list (list): list of background flow priorities prio_dscp_map (dict): priority to DSCP mapping snappi_extra_params (SnappiTestParams obj): additional parameters for Snappi traffic + flow_index (int): Index to identify the base_flow_config. Default is None. """ - base_flow_config = snappi_extra_params.base_flow_config + # If snappi_extra_params.base_flow_config_list exists, + # assign it to base_flow_config using flow_index. + if not snappi_extra_params.base_flow_config_list: + base_flow_config = snappi_extra_params.base_flow_config + else: + base_flow_config = snappi_extra_params.base_flow_config_list[flow_index] + pytest_assert(base_flow_config is not None, "Cannot find base flow configuration") bg_flow_config = snappi_extra_params.traffic_flow_config.background_flow_config pytest_assert(bg_flow_config is not None, "Cannot find background flow configuration") for prio in bg_flow_prio_list: - bg_flow = testbed_config.flows.flow(name='{} Prio {}'.format(bg_flow_config["flow_name"], prio))[-1] + # If flow_index exists, then flow name uses it to identify Stream-name. + if flow_index is None: + bg_flow = testbed_config.flows.flow(name='{} Prio {}'.format(bg_flow_config["flow_name"], prio))[-1] + else: + bg_flow = testbed_config.flows.flow(name='{} Prio {} Stream {}'. + format(bg_flow_config["flow_name"], prio, flow_index))[-1] + bg_flow.tx_rx.port.tx_name = base_flow_config["tx_port_name"] bg_flow.tx_rx.port.rx_name = base_flow_config["rx_port_name"] @@ -231,7 +268,8 @@ def generate_background_flows(testbed_config, def generate_pause_flows(testbed_config, pause_prio_list, global_pause, - snappi_extra_params): + snappi_extra_params, + flow_index=None): """ Generate configurations of pause flows. @@ -240,13 +278,26 @@ def generate_pause_flows(testbed_config, pause_prio_list (list): list of pause priorities global_pause (bool): global pause or per priority pause snappi_extra_params (SnappiTestParams obj): additional parameters for Snappi traffic + flow_index (int): Index to identify the base_flow_config. Default is None. """ - base_flow_config = snappi_extra_params.base_flow_config + # If snappi_extra_params.base_flow_config_list exists, + # assign it to base_flow_config using flow_index. + if not snappi_extra_params.base_flow_config_list: + base_flow_config = snappi_extra_params.base_flow_config + else: + base_flow_config = snappi_extra_params.base_flow_config_list[flow_index] + pytest_assert(base_flow_config is not None, "Cannot find base flow configuration") pause_flow_config = snappi_extra_params.traffic_flow_config.pause_flow_config pytest_assert(pause_flow_config is not None, "Cannot find pause flow configuration") - pause_flow = testbed_config.flows.flow(name=pause_flow_config["flow_name"])[-1] + # If flow_index exists, then flow name uses it to identify Stream-name. + if flow_index is None: + pause_flow = testbed_config.flows.flow(name=pause_flow_config["flow_name"])[-1] + else: + pause_flow = testbed_config.flows.flow(name='{} Stream {}'. + format(pause_flow_config["flow_name"], flow_index))[-1] + pause_flow.tx_rx.port.tx_name = testbed_config.ports[base_flow_config["rx_port_id"]].name pause_flow.tx_rx.port.rx_name = testbed_config.ports[base_flow_config["tx_port_id"]].name @@ -793,7 +844,13 @@ def verify_egress_queue_frame_count(duthost, Returns: """ - dut_port_config = snappi_extra_params.base_flow_config["dut_port_config"] + # If snappi_extra_params.base_flow_config_list exists, + # assign base_flow_config[0]["dut_port_config"] to dut_port_config. + if not snappi_extra_params.base_flow_config_list: + dut_port_config = snappi_extra_params.base_flow_config["dut_port_config"] + else: + dut_port_config = snappi_extra_params.base_flow_config_list[0]["dut_port_config"] + pytest_assert(dut_port_config is not None, 'Flow port config is not provided') set_class_enable_vec = snappi_extra_params.set_pfc_class_enable_vec test_tx_frames = snappi_extra_params.test_tx_frames @@ -814,3 +871,489 @@ def verify_egress_queue_frame_count(duthost, total_egress_packets, _ = get_egress_queue_count(duthost, peer_port, prios[prio]) pytest_assert(total_egress_packets == test_tx_frames[prio], "Queue counters should increment for invalid PFC pause frames") + + +def tgen_curr_stats(traf_metrics, flow_metrics, data_flow_names): + """ + Print the current tgen metrics + + Arg: + traf_metrics (obj): Current traffic item stats on IXIA. + curr_flow_metrics (obj): Current tgen stats. + snappi_extra_params (SnappiTestParams obj): additional parameters for Snappi traffic. + Returns: + stats (dictionary): Dictionary of DUTs statistics + + """ + stats = {} + for metric in traf_metrics: + if metric['Traffic Item'] not in data_flow_names: + continue + rx_rate_gbps = 0 + tx_rate_gbps = 0 + if (int(metric['Rx Rate (Mbps)'] != 0)): + rx_rate_gbps = round(float(metric['Rx Rate (Mbps)'])*1024/(10**6), 2) + if (int(metric['Tx Rate (Mbps)'] != 0)): + tx_rate_gbps = round(float(metric['Tx Rate (Mbps)'])*1024/(10**6), 2) + + metric_name = metric['Traffic Item'].replace(' ', '_').lower() + stats[metric_name+'_txrate_fps'] = float(metric['Tx Frame Rate']) + stats[metric_name+'_txrate_Gbps'] = tx_rate_gbps + stats[metric_name+'_rxrate_fps'] = float(metric['Rx Frame Rate']) + stats[metric_name+'_rxrate_Gbps'] = rx_rate_gbps + stats[metric_name+'_LI_rxrate_Gbps'] = float(metric['Rx L1 Rate (Gbps)']) + + for metric in flow_metrics: + if metric.name not in data_flow_names: + continue + metric_name = metric.name.replace(' ', '_').lower() + stats[metric_name+'_rx_pkts'] = metric.frames_rx + stats[metric_name+'_tx_pkts'] = metric.frames_tx + stats[metric_name+'_loss'] = metric.loss + stats[metric_name+'_avg_latency_ns'] = metric.latency.average_ns + stats[metric_name+'_max_latency_ns'] = metric.latency.maximum_ns + stats[metric_name+'_min_latency_ns'] = metric.latency.minimum_ns + return stats + + +def run_traffic_and_collect_stats(rx_duthost, + tx_duthost, + api, + config, + data_flow_names, + all_flow_names, + exp_dur_sec, + port_map, + fname, + stats_interval, + imix, + snappi_extra_params): + + """ + Run traffic and return per-flow statistics, and capture packets if needed. + Args: + rx_duthost (obj): Traffic receiving DUT host object - Ingress DUT + tx_duthost (obj): Traffic transmitting DUT host object - Egress DUT + api (obj): snappi session + config (obj): experiment config (testbed config + flow config) + data_flow_names (list): list of names of data (test and background flows + all_flow_names (list): list of names of all the flows + exp_dur_sec (int): experiment duration in second + snappi_extra_params (SnappiTestParams obj): additional parameters for Snappi traffic + Returns: + per-flow statistics (list) + """ + # Returns true if any value in prio_list contains the flow_name. + def check_presence(flow_name, prio_list): + return any(m in flow_name for m in prio_list) + + # Returns true if string is absent in all prio_list. + def check_absence(flow_name, prio_list): + return all(m not in flow_name for m in prio_list) + + api.set_config(config) + + logger.info("Wait for Arp to Resolve ...") + wait_for_arp(api, max_attempts=30, poll_interval_sec=2) + + pcap_type = snappi_extra_params.packet_capture_type + + dutport_list = [] + + for m in snappi_extra_params.multi_dut_params.multi_dut_ports: + if m['peer_device'] == snappi_extra_params.multi_dut_params.duthost1.hostname: + dutport_list.append([snappi_extra_params.multi_dut_params.duthost1, m['peer_port']]) + else: + dutport_list.append([snappi_extra_params.multi_dut_params.duthost2, m['peer_port']]) + + switch_tx_lossless_prios = sum(snappi_extra_params.base_flow_config_list[0]["dut_port_config"][1].values(), []) + # Generating list with lossless priorities starting with keyword 'prio_' + prio_list = ['prio_{}'.format(num) for num in switch_tx_lossless_prios] + + # Clearing stats before starting the test + # PFC, counters, queue-counters and dropcounters + # logger.debug('Clearing PFC, dropcounters, queuecounters and stats') + for dut, port in dutport_list: + clear_counters(dut, port) + + if pcap_type != packet_capture.NO_CAPTURE: + logger.info("Starting packet capture ...") + cs = api.capture_state() + cs.port_names = snappi_extra_params.packet_capture_ports + cs.state = cs.START + api.set_capture_state(cs) + + # Returns the rest API object for features not present in Snappi + ixnet_rest_api = api._ixnetwork + + # If imix flag is set, IMIX packet-profile is enabled. + if (imix): + logger.info('Test packet-profile setting to IMIX') + for traff_item in ixnet_rest_api.Traffic.TrafficItem.find(): + config_ele = traff_item.ConfigElement.find()[0].FrameSize + config_ele.PresetDistribution = "imix" + config_ele.Type = "weightedPairs" + config_ele.WeightedPairs = ["128", "7", "570", "4", "1518", "1"] + + ixnet_rest_api.Traffic.TrafficItem.find().Generate() + ixnet_rest_api.Traffic.Apply() + + logger.info("Starting transmit on all flows ...") + ts = api.transmit_state() + ts.state = ts.START + api.set_transmit_state(ts) + + time.sleep(5) + iter_count = round((int(exp_dur_sec) - stats_interval)/stats_interval) + + f_stats = {} + logger.info('Polling DUT and tool for traffic statistics for {} iterations and {} seconds'. + format(iter_count, exp_dur_sec)) + switch_device_results = {} + switch_device_results["tx_frames"] = {} + switch_device_results["rx_frames"] = {} + for lossless_prio in switch_tx_lossless_prios: + switch_device_results["tx_frames"][lossless_prio] = [] + switch_device_results["rx_frames"][lossless_prio] = [] + + exp_dur_sec = exp_dur_sec + ANSIBLE_POLL_DELAY_SEC + + for m in range(int(iter_count)): + now = datetime.now() + logger.info('----------- Collecting Stats for Iteration : {} ------------'.format(m+1)) + f_stats[m] = {'Date': datetime.now().strftime('%Y-%m-%d-%H-%M-%S')} + flow_metrics = fetch_snappi_flow_metrics(api, data_flow_names) + traf_metrics = StatViewAssistant(ixnet_rest_api, 'Traffic Item Statistics').Rows + tx_frame = sum([metric.frames_tx for metric in flow_metrics if metric.name in data_flow_names]) + f_stats[m]['tgen_tx_frames'] = tx_frame + rx_frame = sum([metric.frames_rx for metric in flow_metrics if metric.name in data_flow_names]) + f_stats[m]['tgen_rx_frames'] = rx_frame + f_stats = update_dict(m, f_stats, tgen_curr_stats(traf_metrics, flow_metrics, data_flow_names)) + for dut, port in dutport_list: + f_stats = update_dict(m, f_stats, flatten_dict(get_interface_stats(dut, port))) + f_stats = update_dict(m, f_stats, flatten_dict(get_pfc_count(dut, port))) + f_stats = update_dict(m, f_stats, flatten_dict(get_queue_count_all_prio(dut, port))) + + logger.info("Polling DUT for Egress Queue statistics") + + for lossless_prio in switch_tx_lossless_prios: + count_frames = 0 + for n in range(port_map[0]): + dut, port = dutport_list[n] + count_frames = count_frames + (get_egress_queue_count(dut, port, lossless_prio)[0]) + logger.info( + 'Egress Queue Count for DUT:{}, Port:{}, Priority:{} - {}'.format( + dut.hostname, port, lossless_prio, count_frames + ) + ) + switch_device_results["tx_frames"][lossless_prio].append(count_frames) + count_frames = 0 + for n in range(port_map[2]): + dut, port = dutport_list[-(n+1)] + count_frames = count_frames + (get_egress_queue_count(dut, port, lossless_prio)[0]) + switch_device_results["rx_frames"][lossless_prio].append(count_frames) + later = datetime.now() + time.sleep(abs(round(stats_interval - ((later - now).total_seconds())))) + logger.info('------------------------------------------------------------') + + attempts = 0 + max_attempts = 10 + + while attempts < max_attempts: + logger.info("Checking if all flows have stopped. Attempt #{}".format(attempts + 1)) + flow_metrics = fetch_snappi_flow_metrics(api, data_flow_names) + + # If all the data flows have stopped + transmit_states = [metric.transmit for metric in flow_metrics] + if len(flow_metrics) == len(data_flow_names) and\ + list(set(transmit_states)) == ['stopped']: + logger.info("All test and background traffic flows stopped") + time.sleep(SNAPPI_POLL_DELAY_SEC) + break + else: + if (attempts == 4): + logger.info("Stopping transmit on all remaining flows") + ts = api.transmit_state() + ts.state = ts.STOP + api.set_transmit_state(ts) + time.sleep(stats_interval/4) + attempts += 1 + + pytest_assert(attempts < max_attempts, + "Flows do not stop in {} seconds".format(max_attempts*stats_interval)) + + if pcap_type != packet_capture.NO_CAPTURE: + logger.info("Stopping packet capture ...") + request = api.capture_request() + request.port_name = snappi_extra_params.packet_capture_ports[0] + cs = api.capture_state() + cs.state = cs.STOP + api.set_capture_state(cs) + logger.info("Retrieving and saving packet capture to {}.pcapng".format(snappi_extra_params.packet_capture_file)) + pcap_bytes = api.get_capture(request) + with open(snappi_extra_params.packet_capture_file + ".pcapng", 'wb') as fid: + fid.write(pcap_bytes.getvalue()) + + time.sleep(5) + # Counting egress queue frames at the end of the test. + for lossless_prio in switch_tx_lossless_prios: + count_frames = 0 + for n in range(port_map[0]): + dut, port = dutport_list[n] + count_frames = count_frames + (get_egress_queue_count(dut, port, lossless_prio)[0]) + logger.info( + 'Final egress Queue Count for DUT:{},Port:{}, Priority:{} - {}'.format( + dut.hostname, port, lossless_prio, count_frames + ) + ) + switch_device_results["tx_frames"][lossless_prio].append(count_frames) + count_frames = 0 + for n in range(port_map[2]): + dut, port = dutport_list[-(n+1)] + count_frames = count_frames + (get_egress_queue_count(dut, port, lossless_prio)[0]) + switch_device_results["rx_frames"][lossless_prio].append(count_frames) + + # Dump per-flow statistics for final rows + logger.info("Dumping per-flow statistics for final row") + m = iter_count + f_stats[m] = {'Date': datetime.now().strftime('%Y-%m-%d-%H-%M-%S')} + flow_metrics = fetch_snappi_flow_metrics(api, data_flow_names) + traf_metrics = StatViewAssistant(ixnet_rest_api, 'Traffic Item Statistics').Rows + tx_frame = sum([metric.frames_tx for metric in flow_metrics if metric.name in data_flow_names]) + rx_frame = sum([metric.frames_rx for metric in flow_metrics if metric.name in data_flow_names]) + f_stats[m]['tgen_tx_frames'] = tx_frame + f_stats[m]['tgen_rx_frames'] = rx_frame + f_stats = update_dict(m, f_stats, tgen_curr_stats(traf_metrics, flow_metrics, data_flow_names)) + for dut, port in dutport_list: + f_stats = update_dict(m, f_stats, flatten_dict(get_interface_stats(dut, port))) + f_stats = update_dict(m, f_stats, flatten_dict(get_pfc_count(dut, port))) + f_stats = update_dict(m, f_stats, flatten_dict(get_queue_count_all_prio(dut, port))) + + flow_metrics = fetch_snappi_flow_metrics(api, all_flow_names) + time.sleep(10) + + df = pd.DataFrame(f_stats) + df_t = df.T + df_t = df_t.reindex(sorted(df_t.columns), axis=1) + flow_list = [] + all_flow_names = [flow.name for flow in config.flows] + for item in all_flow_names: + flow_list.append(item.replace(' ', '_').lower()) + results = list(df_t.columns) + fname = fname + '-' + datetime.now().strftime('%Y-%m-%d-%H-%M') + with open(fname+'.txt', 'w') as f: + f.write('Captured data for {} iterations at {} seconds interval \n'.format(m, stats_interval)) + test_stats = {} + test_stats['tgen_loss_pkts'] = 0 + test_stats['tgen_lossy_rx_pkts'] = 0 + test_stats['tgen_lossy_tx_pkts'] = 0 + test_stats['tgen_lossless_rx_pkts'] = 0 + test_stats['tgen_lossless_tx_pkts'] = 0 + test_stats['tgen_rx_rate'] = 0 + test_stats['tgen_tx_rate'] = 0 + for flow in flow_list: + rx_rate = 0 + tx_rate = 0 + for item in results: + if (flow in item and item.split(flow)[1] == '_avg_latency_ns'): + avg_latency = round(df_t[item].mean(), 2) + if (flow in item and item.split(flow)[1] == '_rx_pkts'): + rx_pkts = df_t[item].max() + # Incrementing TGEN lossless priority Rx packets. + if (flow in item and item.split(flow)[1] == '_rx_pkts' and (check_presence(flow, prio_list))): + test_stats['tgen_lossless_rx_pkts'] += rx_pkts + # Incrementing TGEN lossy priority Rx packets. + if (flow in item and item.split(flow)[1] == '_rx_pkts' + and (check_absence(flow, prio_list))): + test_stats['tgen_lossy_rx_pkts'] += rx_pkts + if (flow in item and item.split(flow)[1] == '_tx_pkts'): + tx_pkts = df_t[item].max() + # Incrementing lossless priority Tx packets. + if ((flow in item and item.split(flow)[1] == '_tx_pkts') + and (check_presence(flow, prio_list))): + test_stats['tgen_lossless_tx_pkts'] += tx_pkts + # Incrementing lossy priority Tx packets. + if ((flow in item and item.split(flow)[1] == '_tx_pkts') + and (check_absence(flow, prio_list))): + test_stats['tgen_lossy_tx_pkts'] += tx_pkts + if (flow in item and item.split(flow)[1] == '_rxrate_Gbps'): + if (df_t[item].sum() != 0): + rx_rate = round(df_t.loc[df_t[item] != 0, item].mean(), 2) + else: + rx_rate = 0 + test_stats['tgen_rx_rate'] += round(rx_rate, 2) + if (flow in item and item.split(flow)[1] == '_txrate_Gbps'): + tx_rate = round(df_t.loc[df_t[item] != 0, item].mean(), 2) + test_stats['tgen_tx_rate'] += round(tx_rate, 2) + if (flow in item and item.split(flow)[1] == '_loss'): + loss = df_t[item].max() + if ('pause' not in flow): + test_stats['tgen_loss_pkts'] += (int(tx_pkts) - int(rx_pkts)) + f.write('For {} - Avg_Latency:{}, rx_pkts:{}, tx_pkts:{}, rx_thrput:{}, tx_thrput:{} and loss prcnt:{} \n' + .format(flow, avg_latency, rx_pkts, tx_pkts, rx_rate, tx_rate, loss)) + f.write('Total Lossless Rx pkts:{} and Tx pkts:{} \n'. + format(test_stats['tgen_lossless_rx_pkts'], test_stats['tgen_lossless_tx_pkts'])) + f.write('Total Lossy Rx pkts:{} and Tx pkts:{} \n'. + format(test_stats['tgen_lossy_rx_pkts'], test_stats['tgen_lossy_tx_pkts'])) + f.write('Total TGEN Loss Pkts:{} \n'.format(test_stats['tgen_loss_pkts'])) + # Computing DUT Tx-Rx throughput, packets and failures via interface stats. + test_stats['dut_loss_pkts'] = 0 + for dut, port in dutport_list: + new_key = (dut.hostname + '_' + port).lower() + rx_thrput = 0 + tx_thrput = 0 + for item in results: + if (new_key in item and item.split(new_key)[1] == '_rx_thrput_Mbps'): + rx_thrput = round(df_t.loc[df_t[item] != 0, item].mean(), 2) + if (new_key in item and item.split(new_key)[1] == '_tx_thrput_Mbps'): + tx_thrput = round(df_t.loc[df_t[item] != 0, item].mean(), 2) + if (new_key in item and item.split(new_key)[1] == '_rx_pkts'): + rx_pkts = df_t[item].max() + if (new_key in item and item.split(new_key)[1] == '_tx_pkts'): + tx_pkts = df_t[item].max() + if (new_key in item and item.split(new_key)[1] == '_tx_fail'): + tx_fail = df_t[item].max() + if (new_key in item and item.split(new_key)[1] == '_rx_fail'): + rx_fail = df_t[item].max() + test_stats['dut_loss_pkts'] += (int(tx_fail) + int(rx_fail)) + f.write('For {} - rx_pkts:{}, tx_pkts:{}, rx_thrput:{}, tx_thrput:{} and rx_loss:{}, tx_loss:{} \n'. + format(new_key, rx_pkts, tx_pkts, rx_thrput, tx_thrput, rx_fail, tx_fail)) + f.write('Total DUT Loss Pkts:{} \n'.format(test_stats['dut_loss_pkts'])) + test_stats['dut_lossless_pkts'] = 0 + test_stats['dut_lossy_pkts'] = 0 + prio_key = ['_prio_'] + for dut, port in dutport_list: + new_key = (dut.hostname + '_' + port).lower() + prio_dict = {} + for item in results: + for key in prio_key: + if (new_key in item and key in item.split(new_key)[1]): + prio_dict[item.split(new_key)[1]] = df_t[item].max() + f.write('Egress Queue Count for {} : \n'.format(new_key)) + for key, val in prio_dict.items(): + if val != 0: + # Checking for lossless priorities in the key. + if check_presence(key, prio_list): + test_stats['dut_lossless_pkts'] += val + else: + test_stats['dut_lossy_pkts'] += val + f.write('{}:{} \n'.format(key, val)) + + test_stats['lossless_tx_pfc'] = 0 + test_stats['lossless_rx_pfc'] = 0 + test_stats['lossy_rx_tx_pfc'] = 0 + f.write('Received or Transmitted PFC counts: \n') + tx_pfc_list = ['tx_pfc_{}'.format(num) for num in switch_tx_lossless_prios] + rx_pfc_list = ['rx_pfc_{}'.format(num) for num in switch_tx_lossless_prios] + for item in results: + if ('pfc' in item): + if (df_t[item].max() != 0): + f.write('{} : {} \n'.format(item, df_t[item].max())) + # Lossless priority PFCs transmitted. + if (check_presence(item, tx_pfc_list)): + test_stats['lossless_tx_pfc'] += int(df_t[item].max()) + # Lossless priority PFCs received. + elif (check_presence(item, rx_pfc_list)): + test_stats['lossless_rx_pfc'] += int(df_t[item].max()) + else: + # Lossy PFCs received or transmitted. + test_stats['lossy_rx_tx_pfc'] += int(df_t[item].max()) + + fname = fname + '.csv' + logger.info('Writing statistics to file : {}'.format(fname)) + df_t.to_csv(fname, index=False) + + return flow_metrics, switch_device_results, test_stats + + +def update_dict(m, + orig_dict, + new_dict): + """ + Merges the info from new_dict into orig_dict at index of m + Args: + m (int): index of the orig_dict + orig_dict (dict): original dictionary to be updated + new_dict (dict): dictionary that needs to be fitted in orig_dict + Returns: + orig_dict (dict): Updated orig_dict with new values of new_dict + """ + for key, value in new_dict.items(): + orig_dict[m][key] = value + + return orig_dict + + +def multi_base_traffic_config(testbed_config, + port_config_list, + rx_port_id, + tx_port_id): + """ + Generate base configurations of flows, including test flows, background flows and + pause storm. Test flows and background flows are also known as data flows. + Args: + testbed_config (obj): testbed L1/L2/L3 configuration + port_config_list (list): list of port configuration + rx_port_id (int): Rx ID of DUT port to test + tx_port_id (int): Tx ID of DUT port to test + + Returns: + base_flow_config (dict): base flow configuration containing dut_port_config, tx_mac, + rx_mac, tx_port_config, rx_port_config, tx_port_name, rx_port_name + dict key-value pairs (all keys are strings): + tx_port_id (int): ID of ixia TX port ex. 1 + rx_port_id (int): ID of ixia RX port ex. 2 + tx_port_config (SnappiPortConfig): port config obj for ixia TX port + rx_port_config (SnappiPortConfig): port config obj for ixia RX port + tx_mac (str): MAC address of ixia TX port ex. '00:00:fa:ce:fa:ce' + rx_mac (str): MAC address of ixia RX port ex. '00:00:fa:ce:fa:ce' + tx_port_name (str): name of ixia TX port ex. 'Port 1' + rx_port_name (str): name of ixia RX port ex. 'Port 2' + dut_port_config (list): a list of two dictionaries of tx and rx ports on the peer (switch) side, + and the associated test priorities + ex. [{'Ethernet4':[3, 4]}, {'Ethernet8':[3, 4]}] + test_flow_name_dut_rx_port_map (dict): Mapping of test flow name to DUT RX port(s) + ex. {'flow1': [Ethernet4, Ethernet8]} + test_flow_name_dut_tx_port_map (dict): Mapping of test flow name to DUT TX port(s) + ex. {'flow1': [Ethernet4, Ethernet8]} + """ + base_flow_config = {} + base_flow_config["rx_port_id"] = rx_port_id + base_flow_config["tx_port_id"] = tx_port_id + + tx_port_config = next((x for x in port_config_list if x.id == tx_port_id), None) + rx_port_config = next((x for x in port_config_list if x.id == rx_port_id), None) + base_flow_config["tx_port_config"] = tx_port_config + base_flow_config["rx_port_config"] = rx_port_config + + # Instantiate peer ports in dut_port_config + dut_port_config = [] + tx_dict = {str(tx_port_config.peer_port): []} + rx_dict = {str(rx_port_config.peer_port): []} + dut_port_config.append(tx_dict) + dut_port_config.append(rx_dict) + base_flow_config["dut_port_config"] = dut_port_config + + base_flow_config["tx_mac"] = tx_port_config.mac + if tx_port_config.gateway == rx_port_config.gateway and \ + tx_port_config.prefix_len == rx_port_config.prefix_len: + """ If soruce and destination port are in the same subnet """ + base_flow_config["rx_mac"] = rx_port_config.mac + else: + base_flow_config["rx_mac"] = tx_port_config.gateway_mac + + base_flow_config["tx_port_name"] = testbed_config.ports[tx_port_id].name + base_flow_config["rx_port_name"] = testbed_config.ports[rx_port_id].name + + return base_flow_config + + +def flatten_dict(d, parent_key='', sep='_'): + items = [] + for k, v in d.items(): + new_key = (str(parent_key) + sep + str(k)).lower() if parent_key else k + if isinstance(v, dict): + items.extend(flatten_dict(v, new_key, sep=sep).items()) + else: + items.append((new_key, v)) + return dict(items)