From 56aa53936f2372399f73f1b06b394e8e5f19f2cf Mon Sep 17 00:00:00 2001 From: Longxiang Lyu <35479537+lolyu@users.noreply.github.com> Date: Thu, 16 Jan 2025 12:37:16 +0800 Subject: [PATCH] [mux simulaotr] Improve mux simulator toggle performance (#16508) What is the motivation for this PR? Use thread-pool to parallel run the mux toggles. This code is from PR: #16164, which is reverted. Let's have the change here. Signed-off-by: Longxiang lolv@microsoft.com How did you do it? As the motivation. How did you verify/test it? Run on dualtor/dualtor-120 testbed. Signed-off-by: Longxiang --- ansible/roles/vm_set/files/mux_simulator.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/ansible/roles/vm_set/files/mux_simulator.py b/ansible/roles/vm_set/files/mux_simulator.py index db49c115974..dc15f509576 100644 --- a/ansible/roles/vm_set/files/mux_simulator.py +++ b/ansible/roles/vm_set/files/mux_simulator.py @@ -14,6 +14,11 @@ import traceback import time +if sys.version_info.major == 2: + from multiprocessing.pool import ThreadPool +else: + from concurrent.futures import ThreadPoolExecutor as ThreadPool + from collections import defaultdict from logging.handlers import RotatingFileHandler @@ -559,9 +564,12 @@ def clear_flap_counter(self): class Muxes(object): + MUXES_CONCURRENCY = 4 + def __init__(self, vm_set): self.vm_set = vm_set self.muxes = {} + self.thread_pool = ThreadPool(Muxes.MUXES_CONCURRENCY) for bridge in self._mux_bridges(): bridge_fields = bridge.split('-') port_index = int(bridge_fields[-1]) @@ -596,7 +604,8 @@ def set_active_side(self, new_active_side, port_index=None): mux.set_active_side(new_active_side) return mux.status else: - [mux.set_active_side(new_active_side) for mux in self.muxes.values()] + list(self.thread_pool.map(lambda args: Mux.set_active_side(*args), + [(mux, new_active_side) for mux in self.muxes.values()])) return {mux.bridge: mux.status for mux in self.muxes.values()} def update_flows(self, new_action, out_sides, port_index=None): @@ -605,7 +614,8 @@ def update_flows(self, new_action, out_sides, port_index=None): mux.update_flows(new_action, out_sides) return mux.status else: - [mux.update_flows(new_action, out_sides) for mux in self.muxes.values()] + list(self.thread_pool.map(lambda args: Mux.update_flows(*args), + [(mux, new_action, out_sides) for mux in self.muxes.values()])) return {mux.bridge: mux.status for mux in self.muxes.values()} def reset_flows(self, port_index=None):