diff --git a/anta/device.py b/anta/device.py index ba363330e..3624fdb2e 100644 --- a/anta/device.py +++ b/anta/device.py @@ -8,13 +8,12 @@ import asyncio import logging from abc import ABC, abstractmethod -from collections import defaultdict +from collections import OrderedDict, defaultdict +from time import monotonic from typing import TYPE_CHECKING, Any, Literal import asyncssh import httpcore -from aiocache import Cache -from aiocache.plugins import HitMissRatioPlugin from asyncssh import SSHClientConnection, SSHClientConnectionOptions from httpx import ConnectError, HTTPError, TimeoutException @@ -34,6 +33,67 @@ CLIENT_KEYS = asyncssh.public_key.load_default_keypairs() +class AntaCache: + """Class to be used as cache. + + Example + ------- + + ```python + # Create cache + cache = AntaCache("device1") + with cache.locks[key]: + command_output = cache.get(key) + ``` + """ + + def __init__(self, device: str, max_size: int = 128, ttl: int = 60) -> None: + """Initialize the cache.""" + self.device = device + self.cache: OrderedDict[str, Any] = OrderedDict() + self.locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + self.max_size = max_size + self.ttl = ttl + + # Stats + self.stats: dict[str, int] = {} + self._init_stats() + + def _init_stats(self) -> None: + """Initialize the stats.""" + self.stats["hits"] = 0 + self.stats["total"] = 0 + + async def get(self, key: str) -> Any: # noqa: ANN401 + """Return the cached entry for key.""" + self.stats["total"] += 1 + if key in self.cache: + timestamp, value = self.cache[key] + if monotonic() - timestamp < self.ttl: + # checking the value is still valid + self.cache.move_to_end(key) + self.stats["hits"] += 1 + return value + # Time expired + del self.cache[key] + del self.locks[key] + return None + + async def set(self, key: str, value: Any) -> bool: # noqa: ANN401 + """Set the cached entry for key to value.""" + timestamp = monotonic() + if len(self.cache) > self.max_size: + self.cache.popitem(last=False) + self.cache[key] = timestamp, value + return True + + def clear(self) -> None: + """Empty the cache.""" + logger.debug("Clearing cache for device %s", self.device) + self.cache = OrderedDict() + self._init_stats() + + class AntaDevice(ABC): """Abstract class representing a device in ANTA. @@ -52,10 +112,11 @@ class AntaDevice(ABC): Hardware model of the device. tags : set[str] Tags for this device. - cache : Cache | None - In-memory cache from aiocache library for this device (None if cache is disabled). + cache : AntaCache | None + In-memory cache for this device (None if cache is disabled). cache_locks : dict Dictionary mapping keys to asyncio locks to guarantee exclusive access to the cache if not disabled. + Deprecated, will be removed in ANTA v2.0.0, use self.cache.locks instead. """ @@ -79,7 +140,8 @@ def __init__(self, name: str, tags: set[str] | None = None, *, disable_cache: bo self.tags.add(self.name) self.is_online: bool = False self.established: bool = False - self.cache: Cache | None = None + self.cache: AntaCache | None = None + # Keeping cache_locks for backward compatibility. self.cache_locks: defaultdict[str, asyncio.Lock] | None = None # Initialize cache if not disabled @@ -101,17 +163,16 @@ def __hash__(self) -> int: def _init_cache(self) -> None: """Initialize cache for the device, can be overridden by subclasses to manipulate how it works.""" - self.cache = Cache(cache_class=Cache.MEMORY, ttl=60, namespace=self.name, plugins=[HitMissRatioPlugin()]) - self.cache_locks = defaultdict(asyncio.Lock) + self.cache = AntaCache(device=self.name, ttl=60) + self.cache_locks = self.cache.locks @property def cache_statistics(self) -> dict[str, Any] | None: """Return the device cache statistics for logging purposes.""" - # Need to ignore pylint no-member as Cache is a proxy class and pylint is not smart enough - # https://github.com/pylint-dev/pylint/issues/7258 if self.cache is not None: - stats = getattr(self.cache, "hit_miss_ratio", {"total": 0, "hits": 0, "hit_ratio": 0}) - return {"total_commands_sent": stats["total"], "cache_hits": stats["hits"], "cache_hit_ratio": f"{stats['hit_ratio'] * 100:.2f}%"} + stats = self.cache.stats + ratio = stats["hits"] / stats["total"] if stats["total"] > 0 else 0 + return {"total_commands_sent": stats["total"], "cache_hits": stats["hits"], "cache_hit_ratio": f"{ratio * 100:.2f}%"} return None def __rich_repr__(self) -> Iterator[tuple[str, Any]]: @@ -177,18 +238,16 @@ async def collect(self, command: AntaCommand, *, collection_id: str | None = Non collection_id An identifier used to build the eAPI request ID. """ - # Need to ignore pylint no-member as Cache is a proxy class and pylint is not smart enough - # https://github.com/pylint-dev/pylint/issues/7258 - if self.cache is not None and self.cache_locks is not None and command.use_cache: - async with self.cache_locks[command.uid]: - cached_output = await self.cache.get(command.uid) # pylint: disable=no-member + if self.cache is not None and command.use_cache: + async with self.cache.locks[command.uid]: + cached_output = await self.cache.get(command.uid) if cached_output is not None: logger.debug("Cache hit for %s on %s", command.command, self.name) command.output = cached_output else: await self._collect(command=command, collection_id=collection_id) - await self.cache.set(command.uid, command.output) # pylint: disable=no-member + await self.cache.set(command.uid, command.output) else: await self._collect(command=command, collection_id=collection_id) diff --git a/docs/advanced_usages/caching.md b/docs/advanced_usages/caching.md index a9c18182c..1e794ea56 100644 --- a/docs/advanced_usages/caching.md +++ b/docs/advanced_usages/caching.md @@ -8,21 +8,8 @@ ANTA is a streamlined Python framework designed for efficient interaction with n ## Configuration -By default, ANTA utilizes [aiocache](https://github.com/aio-libs/aiocache)'s memory cache backend, also called [`SimpleMemoryCache`](https://aiocache.aio-libs.org/en/v0.12.2/caches.html#simplememorycache). This library aims for simplicity and supports asynchronous operations to go along with Python `asyncio` used in ANTA. - The `_init_cache()` method of the [AntaDevice](../api/device.md#anta.device.AntaDevice) abstract class initializes the cache. Child classes can override this method to tweak the cache configuration: -```python -def _init_cache(self) -> None: - """ - Initialize cache for the device, can be overridden by subclasses to manipulate how it works - """ - self.cache = Cache(cache_class=Cache.MEMORY, ttl=60, namespace=self.name, plugins=[HitMissRatioPlugin()]) - self.cache_locks = defaultdict(asyncio.Lock) -``` - -The cache is also configured with `aiocache`'s [`HitMissRatioPlugin`](https://aiocache.aio-libs.org/en/v0.12.2/plugins.html#hitmissratioplugin) plugin to calculate the ratio of hits the cache has and give useful statistics for logging purposes in ANTA. - ## Cache key design The cache is initialized per `AntaDevice` and uses the following cache key design: @@ -31,7 +18,7 @@ The cache is initialized per `AntaDevice` and uses the following cache key desig The `uid` is an attribute of [AntaCommand](../api/models.md#anta.models.AntaCommand), which is a unique identifier generated from the command, version, revision and output format. -Each UID has its own asyncio lock. This design allows coroutines that need to access the cache for different UIDs to do so concurrently. The locks are managed by the `self.cache_locks` dictionary. +Each UID has its own asyncio lock. This design allows coroutines that need to access the cache for different UIDs to do so concurrently. The locks are managed by the `AntaCache.locks` dictionary. ## Mechanisms @@ -45,35 +32,35 @@ There might be scenarios where caching is not wanted. You can disable caching in 1. Caching can be disabled globally, for **ALL** commands on **ALL** devices, using the `--disable-cache` global flag when invoking anta at the [CLI](../cli/overview.md#invoking-anta-cli): - ```bash - anta --disable-cache --username arista --password arista nrfu table - ``` + ```bash + anta --disable-cache --username arista --password arista nrfu table + ``` 2. Caching can be disabled per device, network or range by setting the `disable_cache` key to `True` when defining the ANTA [Inventory](../usage-inventory-catalog.md#device-inventory) file: - ```yaml - anta_inventory: - hosts: - - host: 172.20.20.101 - name: DC1-SPINE1 - tags: ["SPINE", "DC1"] - disable_cache: True # Set this key to True - - host: 172.20.20.102 - name: DC1-SPINE2 - tags: ["SPINE", "DC1"] - disable_cache: False # Optional since it's the default - - networks: - - network: "172.21.21.0/24" - disable_cache: True - - ranges: - - start: 172.22.22.10 - end: 172.22.22.19 - disable_cache: True - ``` - - This approach effectively disables caching for **ALL** commands sent to devices targeted by the `disable_cache` key. + ```yaml + anta_inventory: + hosts: + - host: 172.20.20.101 + name: DC1-SPINE1 + tags: ["SPINE", "DC1"] + disable_cache: True # Set this key to True + - host: 172.20.20.102 + name: DC1-SPINE2 + tags: ["SPINE", "DC1"] + disable_cache: False # Optional since it's the default + + networks: + - network: "172.21.21.0/24" + disable_cache: True + + ranges: + - start: 172.22.22.10 + end: 172.22.22.19 + disable_cache: True + ``` + + This approach effectively disables caching for **ALL** commands sent to devices targeted by the `disable_cache` key. 3. For tests developers, caching can be disabled for a specific [`AntaCommand`](../api/models.md#anta.models.AntaCommand) or [`AntaTemplate`](../api/models.md#anta.models.AntaTemplate) by setting the `use_cache` attribute to `False`. That means the command output will always be collected on the device and therefore, never use caching. diff --git a/pyproject.toml b/pyproject.toml index 09fee8c3e..6d47f6897 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,6 @@ maintainers = [ description = "Arista Network Test Automation (ANTA) Framework" license = { file = "LICENSE" } dependencies = [ - "aiocache>=0.12.2", "asyncssh>=2.16", "cvprac>=1.3.1", "eval-type-backport>=0.1.3", # Support newer typing features in older Python versions (required until Python 3.9 support is removed) @@ -143,7 +142,7 @@ plugins = [ ] # Comment below for better type checking #follow_imports = "skip" -# Make it false if we implement stubs using stubgen from mypy for aio-eapi, aiocache and cvprac +# Make it false if we implement stubs using stubgen from mypy for asynceapi, cvprac # and configure mypy_path to generated stubs e.g.: mypy_path = "./out" ignore_missing_imports = true warn_redundant_casts = true diff --git a/tests/units/test_device.py b/tests/units/test_device.py index d7b25d4eb..e65eeb2a3 100644 --- a/tests/units/test_device.py +++ b/tests/units/test_device.py @@ -589,11 +589,11 @@ async def test_collect(self, device: AntaDevice, command: dict[str, Any], expect if expected["cache_hit"] is True: assert cmd.output == cached_output assert current_cached_data == cached_output - assert device.cache.hit_miss_ratio["hits"] == 2 + assert device.cache.stats["hits"] == 2 else: assert cmd.output == COMMAND_OUTPUT assert current_cached_data == COMMAND_OUTPUT - assert device.cache.hit_miss_ratio["hits"] == 1 + assert device.cache.stats["hits"] == 1 else: # command is not allowed to use cache device._collect.assert_called_once_with(command=cmd, collection_id=None) # type: ignore[attr-defined] assert cmd.output == COMMAND_OUTPUT