Skip to content

Commit

Permalink
refactor: Replace aiocache with AntaCache (#1006)
Browse files Browse the repository at this point in the history
* Refactor: Replace aiocache with AntaCache
* doc: Cleanup aiocache references
* refactor: Address PR comments
* refactor: Address PR comments
---------
Co-authored-by: Carl Baillargeon <carl.baillargeon@arista.com>
  • Loading branch information
gmuloc authored Jan 15, 2025
1 parent 1be75b8 commit e82c1a5
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 62 deletions.
95 changes: 77 additions & 18 deletions anta/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
import asyncio
import logging
from abc import ABC, abstractmethod
from collections import defaultdict
from collections import OrderedDict, defaultdict
from time import monotonic
from typing import TYPE_CHECKING, Any, Literal

import asyncssh
import httpcore
from aiocache import Cache
from aiocache.plugins import HitMissRatioPlugin
from asyncssh import SSHClientConnection, SSHClientConnectionOptions
from httpx import ConnectError, HTTPError, TimeoutException

Expand All @@ -34,6 +33,67 @@
CLIENT_KEYS = asyncssh.public_key.load_default_keypairs()


class AntaCache:
"""Class to be used as cache.
Example
-------
```python
# Create cache
cache = AntaCache("device1")
with cache.locks[key]:
command_output = cache.get(key)
```
"""

def __init__(self, device: str, max_size: int = 128, ttl: int = 60) -> None:
"""Initialize the cache."""
self.device = device
self.cache: OrderedDict[str, Any] = OrderedDict()
self.locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
self.max_size = max_size
self.ttl = ttl

# Stats
self.stats: dict[str, int] = {}
self._init_stats()

def _init_stats(self) -> None:
"""Initialize the stats."""
self.stats["hits"] = 0
self.stats["total"] = 0

async def get(self, key: str) -> Any: # noqa: ANN401
"""Return the cached entry for key."""
self.stats["total"] += 1
if key in self.cache:
timestamp, value = self.cache[key]
if monotonic() - timestamp < self.ttl:
# checking the value is still valid
self.cache.move_to_end(key)
self.stats["hits"] += 1
return value
# Time expired
del self.cache[key]
del self.locks[key]
return None

async def set(self, key: str, value: Any) -> bool: # noqa: ANN401
"""Set the cached entry for key to value."""
timestamp = monotonic()
if len(self.cache) > self.max_size:
self.cache.popitem(last=False)
self.cache[key] = timestamp, value
return True

def clear(self) -> None:
"""Empty the cache."""
logger.debug("Clearing cache for device %s", self.device)
self.cache = OrderedDict()
self._init_stats()


class AntaDevice(ABC):
"""Abstract class representing a device in ANTA.
Expand All @@ -52,10 +112,11 @@ class AntaDevice(ABC):
Hardware model of the device.
tags : set[str]
Tags for this device.
cache : Cache | None
In-memory cache from aiocache library for this device (None if cache is disabled).
cache : AntaCache | None
In-memory cache for this device (None if cache is disabled).
cache_locks : dict
Dictionary mapping keys to asyncio locks to guarantee exclusive access to the cache if not disabled.
Deprecated, will be removed in ANTA v2.0.0, use self.cache.locks instead.
"""

Expand All @@ -79,7 +140,8 @@ def __init__(self, name: str, tags: set[str] | None = None, *, disable_cache: bo
self.tags.add(self.name)
self.is_online: bool = False
self.established: bool = False
self.cache: Cache | None = None
self.cache: AntaCache | None = None
# Keeping cache_locks for backward compatibility.
self.cache_locks: defaultdict[str, asyncio.Lock] | None = None

# Initialize cache if not disabled
Expand All @@ -101,17 +163,16 @@ def __hash__(self) -> int:

def _init_cache(self) -> None:
"""Initialize cache for the device, can be overridden by subclasses to manipulate how it works."""
self.cache = Cache(cache_class=Cache.MEMORY, ttl=60, namespace=self.name, plugins=[HitMissRatioPlugin()])
self.cache_locks = defaultdict(asyncio.Lock)
self.cache = AntaCache(device=self.name, ttl=60)
self.cache_locks = self.cache.locks

@property
def cache_statistics(self) -> dict[str, Any] | None:
"""Return the device cache statistics for logging purposes."""
# Need to ignore pylint no-member as Cache is a proxy class and pylint is not smart enough
# https://github.com/pylint-dev/pylint/issues/7258
if self.cache is not None:
stats = getattr(self.cache, "hit_miss_ratio", {"total": 0, "hits": 0, "hit_ratio": 0})
return {"total_commands_sent": stats["total"], "cache_hits": stats["hits"], "cache_hit_ratio": f"{stats['hit_ratio'] * 100:.2f}%"}
stats = self.cache.stats
ratio = stats["hits"] / stats["total"] if stats["total"] > 0 else 0
return {"total_commands_sent": stats["total"], "cache_hits": stats["hits"], "cache_hit_ratio": f"{ratio * 100:.2f}%"}
return None

def __rich_repr__(self) -> Iterator[tuple[str, Any]]:
Expand Down Expand Up @@ -177,18 +238,16 @@ async def collect(self, command: AntaCommand, *, collection_id: str | None = Non
collection_id
An identifier used to build the eAPI request ID.
"""
# Need to ignore pylint no-member as Cache is a proxy class and pylint is not smart enough
# https://github.com/pylint-dev/pylint/issues/7258
if self.cache is not None and self.cache_locks is not None and command.use_cache:
async with self.cache_locks[command.uid]:
cached_output = await self.cache.get(command.uid) # pylint: disable=no-member
if self.cache is not None and command.use_cache:
async with self.cache.locks[command.uid]:
cached_output = await self.cache.get(command.uid)

if cached_output is not None:
logger.debug("Cache hit for %s on %s", command.command, self.name)
command.output = cached_output
else:
await self._collect(command=command, collection_id=collection_id)
await self.cache.set(command.uid, command.output) # pylint: disable=no-member
await self.cache.set(command.uid, command.output)
else:
await self._collect(command=command, collection_id=collection_id)

Expand Down
67 changes: 27 additions & 40 deletions docs/advanced_usages/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,8 @@ ANTA is a streamlined Python framework designed for efficient interaction with n

## Configuration

By default, ANTA utilizes [aiocache](https://github.com/aio-libs/aiocache)'s memory cache backend, also called [`SimpleMemoryCache`](https://aiocache.aio-libs.org/en/v0.12.2/caches.html#simplememorycache). This library aims for simplicity and supports asynchronous operations to go along with Python `asyncio` used in ANTA.

The `_init_cache()` method of the [AntaDevice](../api/device.md#anta.device.AntaDevice) abstract class initializes the cache. Child classes can override this method to tweak the cache configuration:

```python
def _init_cache(self) -> None:
"""
Initialize cache for the device, can be overridden by subclasses to manipulate how it works
"""
self.cache = Cache(cache_class=Cache.MEMORY, ttl=60, namespace=self.name, plugins=[HitMissRatioPlugin()])
self.cache_locks = defaultdict(asyncio.Lock)
```

The cache is also configured with `aiocache`'s [`HitMissRatioPlugin`](https://aiocache.aio-libs.org/en/v0.12.2/plugins.html#hitmissratioplugin) plugin to calculate the ratio of hits the cache has and give useful statistics for logging purposes in ANTA.

## Cache key design

The cache is initialized per `AntaDevice` and uses the following cache key design:
Expand All @@ -31,7 +18,7 @@ The cache is initialized per `AntaDevice` and uses the following cache key desig

The `uid` is an attribute of [AntaCommand](../api/models.md#anta.models.AntaCommand), which is a unique identifier generated from the command, version, revision and output format.

Each UID has its own asyncio lock. This design allows coroutines that need to access the cache for different UIDs to do so concurrently. The locks are managed by the `self.cache_locks` dictionary.
Each UID has its own asyncio lock. This design allows coroutines that need to access the cache for different UIDs to do so concurrently. The locks are managed by the `AntaCache.locks` dictionary.

## Mechanisms

Expand All @@ -45,35 +32,35 @@ There might be scenarios where caching is not wanted. You can disable caching in

1. Caching can be disabled globally, for **ALL** commands on **ALL** devices, using the `--disable-cache` global flag when invoking anta at the [CLI](../cli/overview.md#invoking-anta-cli):

```bash
anta --disable-cache --username arista --password arista nrfu table
```
```bash
anta --disable-cache --username arista --password arista nrfu table
```

2. Caching can be disabled per device, network or range by setting the `disable_cache` key to `True` when defining the ANTA [Inventory](../usage-inventory-catalog.md#device-inventory) file:

```yaml
anta_inventory:
hosts:
- host: 172.20.20.101
name: DC1-SPINE1
tags: ["SPINE", "DC1"]
disable_cache: True # Set this key to True
- host: 172.20.20.102
name: DC1-SPINE2
tags: ["SPINE", "DC1"]
disable_cache: False # Optional since it's the default
networks:
- network: "172.21.21.0/24"
disable_cache: True
ranges:
- start: 172.22.22.10
end: 172.22.22.19
disable_cache: True
```

This approach effectively disables caching for **ALL** commands sent to devices targeted by the `disable_cache` key.
```yaml
anta_inventory:
hosts:
- host: 172.20.20.101
name: DC1-SPINE1
tags: ["SPINE", "DC1"]
disable_cache: True # Set this key to True
- host: 172.20.20.102
name: DC1-SPINE2
tags: ["SPINE", "DC1"]
disable_cache: False # Optional since it's the default

networks:
- network: "172.21.21.0/24"
disable_cache: True

ranges:
- start: 172.22.22.10
end: 172.22.22.19
disable_cache: True
```
This approach effectively disables caching for **ALL** commands sent to devices targeted by the `disable_cache` key.

3. For tests developers, caching can be disabled for a specific [`AntaCommand`](../api/models.md#anta.models.AntaCommand) or [`AntaTemplate`](../api/models.md#anta.models.AntaTemplate) by setting the `use_cache` attribute to `False`. That means the command output will always be collected on the device and therefore, never use caching.

Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ maintainers = [
description = "Arista Network Test Automation (ANTA) Framework"
license = { file = "LICENSE" }
dependencies = [
"aiocache>=0.12.2",
"asyncssh>=2.16",
"cvprac>=1.3.1",
"eval-type-backport>=0.1.3", # Support newer typing features in older Python versions (required until Python 3.9 support is removed)
Expand Down Expand Up @@ -143,7 +142,7 @@ plugins = [
]
# Comment below for better type checking
#follow_imports = "skip"
# Make it false if we implement stubs using stubgen from mypy for aio-eapi, aiocache and cvprac
# Make it false if we implement stubs using stubgen from mypy for asynceapi, cvprac
# and configure mypy_path to generated stubs e.g.: mypy_path = "./out"
ignore_missing_imports = true
warn_redundant_casts = true
Expand Down
4 changes: 2 additions & 2 deletions tests/units/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,11 +589,11 @@ async def test_collect(self, device: AntaDevice, command: dict[str, Any], expect
if expected["cache_hit"] is True:
assert cmd.output == cached_output
assert current_cached_data == cached_output
assert device.cache.hit_miss_ratio["hits"] == 2
assert device.cache.stats["hits"] == 2
else:
assert cmd.output == COMMAND_OUTPUT
assert current_cached_data == COMMAND_OUTPUT
assert device.cache.hit_miss_ratio["hits"] == 1
assert device.cache.stats["hits"] == 1
else: # command is not allowed to use cache
device._collect.assert_called_once_with(command=cmd, collection_id=None) # type: ignore[attr-defined]
assert cmd.output == COMMAND_OUTPUT
Expand Down

0 comments on commit e82c1a5

Please sign in to comment.