Skip to content

Commit

Permalink
Refine the thread log batch logic
Browse files Browse the repository at this point in the history
Signed-off-by: Longxiang <lolv@microsoft.com>
  • Loading branch information
lolyu committed Dec 26, 2024
1 parent c97fb41 commit fa6b408
Showing 1 changed file with 75 additions and 18 deletions.
93 changes: 75 additions & 18 deletions ansible/roles/vm_set/library/vm_topology.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/usr/bin/python

import contextlib
import functools
import hashlib
import json
import multiprocessing
import os.path
import re
import subprocess
Expand Down Expand Up @@ -171,9 +171,8 @@

RT_TABLE_FILEPATH = "/etc/iproute2/rt_tables"

DEFAULT_THREAD_WORKER_COUNT = 8

logger = logging.getLogger()
MIN_THREAD_WORKER_COUNT = 8
LOG_SEPARATOR = "=" * 120


def construct_log_filename(cmd, vm_set_name):
Expand Down Expand Up @@ -457,7 +456,7 @@ def create_bridges(self):
VS_CHASSIS_MIDPLANE_BRIDGE_NAME, self.fp_mtu)

def create_ovs_bridge(self, bridge_name, mtu):
logger.info('=== Create bridge %s with mtu %d ===' %
logging.info('=== Create bridge %s with mtu %d ===' %
(bridge_name, mtu))
VMTopology.cmd('ovs-vsctl --may-exist add-br %s' % bridge_name)

Expand All @@ -478,7 +477,7 @@ def destroy_bridges(self):
self.destroy_ovs_bridge(VS_CHASSIS_MIDPLANE_BRIDGE_NAME)

def destroy_ovs_bridge(self, bridge_name):
logger.info('=== Destroy bridge %s ===' % bridge_name)
logging.info('=== Destroy bridge %s ===' % bridge_name)
VMTopology.cmd('ovs-vsctl --if-exists del-br %s' % bridge_name)

def get_vm_bridges(self, vmname):
Expand Down Expand Up @@ -559,7 +558,7 @@ def add_br_if_to_docker(self, bridge, ext_if, int_if):
# add unique suffix to int_if to support multiple tasks run concurrently
tmp_int_if = int_if + \
VMTopology._generate_fingerprint(ext_if, MAX_INTF_LEN-len(int_if))
logger.info('=== For veth pair, add %s to bridge %s, set %s to PTF docker, tmp intf %s' % (
logging.info('=== For veth pair, add %s to bridge %s, set %s to PTF docker, tmp intf %s' % (
ext_if, bridge, int_if, tmp_int_if))
if VMTopology.intf_not_exists(ext_if):
VMTopology.cmd("ip link add %s type veth peer name %s" %
Expand All @@ -582,7 +581,7 @@ def add_br_if_to_netns(self, bridge, ext_if, int_if):
# add unique suffix to int_if to support multiple tasks run concurrently
tmp_int_if = int_if + \
VMTopology._generate_fingerprint(ext_if, MAX_INTF_LEN-len(int_if))
logger.info('=== For veth pair, add %s to bridge %s, set %s to netns, tmp intf %s' % (
logging.info('=== For veth pair, add %s to bridge %s, set %s to netns, tmp intf %s' % (
ext_if, bridge, int_if, tmp_int_if))
if VMTopology.intf_not_exists(ext_if):
VMTopology.cmd("ip link add %s type veth peer name %s" %
Expand Down Expand Up @@ -657,7 +656,7 @@ def add_ip_to_netns_if(self, int_if, ip_addr, ipv6_addr=None, default_gw=None, d
(self.netns, default_gw_v6, int_if))

def add_dut_if_to_docker(self, iface_name, dut_iface):
logger.info("=== Add DUT interface %s to PTF docker as %s ===" %
logging.info("=== Add DUT interface %s to PTF docker as %s ===" %
(dut_iface, iface_name))
if VMTopology.intf_exists(dut_iface) \
and VMTopology.intf_not_exists(dut_iface, pid=self.pid) \
Expand All @@ -680,7 +679,7 @@ def add_dut_vlan_subif_to_docker(self, iface_name, vlan_separator, vlan_id):
(self.pid, vlan_sub_iface_name))

def remove_dut_if_from_docker(self, iface_name, dut_iface):
logger.info("=== Restore docker interface %s as dut interface %s ===" % (iface_name, dut_iface))
logging.info("=== Restore docker interface %s as dut interface %s ===" % (iface_name, dut_iface))
if self.pid is None:
return

Expand Down Expand Up @@ -1928,15 +1927,34 @@ def check_params(module, params, mode):


class ThreadBufferHandler(logging.Handler):
"""
ThreadBufferHandler stores log records from each thread separately and can flush
logs from each thread separately.
Each thread will have its own memory log handler, and each log will be only buffered in
the memory log handler of the thread that emits the log. The flushing is performed by
each memory log handler whenever the memory buffer is full or explicitly triggered by
user. The logs from one thread will be coalesced together and batch-sent to the target
handler.
"""

THREAD_LOG_HANDLER_CAPACITY = 4096

def __init__(self, target, loglevel=logging.NOTSET):
"""
Initialize the ThreadBufferHandler object.
Args:
target: the target handler, all log records stored temporarily in this handler will be
flushed to the target handler.
loglevel: log level.
"""
super(ThreadBufferHandler, self).__init__(level=loglevel)
self.memory_handlers = {}
self.target = target

def get_current_thread_log_memory_handler(self):
"""Get the current thread log memory handler."""
thread_id = threading.current_thread().ident
if thread_id in self.memory_handlers:
return self.memory_handlers[thread_id]
Expand All @@ -1947,17 +1965,25 @@ def get_current_thread_log_memory_handler(self):
return memory_handler

def flush_current_thread_logs(self):
"""Flush the log records stored in the current thread log memory handler."""
self.get_current_thread_log_memory_handler().flush()

def emit(self, record):
"""
Emit a record.
Dispatch the log record to the current thread log memory handler.
"""
self.get_current_thread_log_memory_handler().emit(record)

def flush(self):
"""Flush all log records to the target handler."""
for handler in self.memory_handlers.values():
handler.flush()
self.target.flush()

def close(self):
"""Close all log memory handlers."""
for handler in self.memory_handlers.values():
handler.close()
self.memory_handlers.clear()
Expand All @@ -1969,9 +1995,18 @@ class VMTopologyWorker(object):
"""VM Topology worker class."""

def __init__(self, use_thread_worker, thread_worker_count):
"""
Initialize the VMTopologyWorker object.
Args:
use_thread_worker: use thread pool or not.
thread_worker_count: the thread worker count if use thread pool is enabled.
"""
logging.info("Init VM topology worker: use thread worker %s, thread worker count %s",
use_thread_worker, thread_worker_count)
self.thread_pool = None
self._map_helper = map
self.shutdown_helper = None
self._shutdown_helper = None
self.use_thread_worker = use_thread_worker
self.thread_worker_count = thread_worker_count
self.thread_buffer_handler = None
Expand All @@ -1988,27 +2023,47 @@ def __init__(self, use_thread_worker, thread_worker_count):
self._setup_thread_buffered_handler()

def _setup_thread_buffered_handler(self):
"""Setup the per-thread log batch handler with ThreadBufferHandler."""
handlers = logging.getLogger().handlers
if not handlers:
raise ValueError("No logging handler is available in the default logger.")
raise ValueError("No logging handler is available in the default logging.")
handler = handlers[-1]
self.thread_buffer_handler = ThreadBufferHandler(target=handler)
handlers[-1] = self.thread_buffer_handler

def map(self, func, iterable):

"""Apply the function to every item of the iterable."""
def _buffer_logs_helper(func, *args, **kwargs):
if self.use_thread_worker:
logging.debug(LOG_SEPARATOR)
logging.debug("Start task %s, arguments (%s, %s), worker %s",
func, args, kwargs, threading.current_thread().ident)
try:
func(*args, **kwargs)
finally:
if self.use_thread_worker:
logging.debug("Finish task %s, arguments (%s, %s), worker %s",
func, args, kwargs, threading.current_thread().ident)
logging.debug(LOG_SEPARATOR)
self.thread_buffer_handler.flush_current_thread_logs()

return list(self._map_helper(functools.partial(_buffer_logs_helper, func), iterable))
# NOTE: replace the original handler with the thread buffer handler, so logs from
# one task will be buffered and flushed together.
if self.use_thread_worker:
handlers = logging.getLogger().handlers
handlers.remove(self.thread_buffer_handler.target)
handlers.append(self.thread_buffer_handler)
try:
return list(self._map_helper(functools.partial(_buffer_logs_helper, func), iterable))
finally:
if self.use_thread_worker:
handlers.remove(self.thread_buffer_handler)
handlers.append(self.thread_buffer_handler.target)

def shutdown(self):
if self._shutdown_helper:
"""Stop the worker threads immediately without completing outstanding work."""
if self.use_thread_worker:
self._shutdown_helper()
self.thread_buffer_handler.flush()

def __del__(self):
self.shutdown()
Expand Down Expand Up @@ -2041,8 +2096,10 @@ def main():
default=NUM_FP_VLANS_PER_FP),
netns_mgmt_ip_addr=dict(required=False, type='str', default=None),
is_dpu=(dict(required=False, type='bool', default=False)),
use_thread_worker=dict(required=False, type='bool', default=False),
thread_worker_count=dict(required=False, type='int', default=DEFAULT_THREAD_WORKER_COUNT)
use_thread_worker=dict(required=False, type='bool', default=True),
thread_worker_count=dict(required=False, type='int',
default=max(MIN_THREAD_WORKER_COUNT,
multiprocessing.cpu_count() // 8))
),
supports_check_mode=False)

Expand Down

0 comments on commit fa6b408

Please sign in to comment.