diff --git a/anta/constants.py b/anta/constants.py index ae131dd1a..502dfe4ab 100644 --- a/anta/constants.py +++ b/anta/constants.py @@ -45,3 +45,11 @@ r"No source interface .*", ] """List of known EOS errors that should set a test status to 'failure' with the error message.""" + +UNSUPPORTED_PLATFORM_ERRORS = [ + "not supported on this hardware platform", + "Invalid input (at token 2: 'trident')", +] +"""Error messages indicating platform or hardware unsupported commands. +Will set the test status to 'skipped'. Includes both general hardware +platform errors and specific ASIC family limitations.""" diff --git a/anta/device.py b/anta/device.py index ba363330e..3624fdb2e 100644 --- a/anta/device.py +++ b/anta/device.py @@ -8,13 +8,12 @@ import asyncio import logging from abc import ABC, abstractmethod -from collections import defaultdict +from collections import OrderedDict, defaultdict +from time import monotonic from typing import TYPE_CHECKING, Any, Literal import asyncssh import httpcore -from aiocache import Cache -from aiocache.plugins import HitMissRatioPlugin from asyncssh import SSHClientConnection, SSHClientConnectionOptions from httpx import ConnectError, HTTPError, TimeoutException @@ -34,6 +33,67 @@ CLIENT_KEYS = asyncssh.public_key.load_default_keypairs() +class AntaCache: + """Class to be used as cache. + + Example + ------- + + ```python + # Create cache + cache = AntaCache("device1") + with cache.locks[key]: + command_output = cache.get(key) + ``` + """ + + def __init__(self, device: str, max_size: int = 128, ttl: int = 60) -> None: + """Initialize the cache.""" + self.device = device + self.cache: OrderedDict[str, Any] = OrderedDict() + self.locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + self.max_size = max_size + self.ttl = ttl + + # Stats + self.stats: dict[str, int] = {} + self._init_stats() + + def _init_stats(self) -> None: + """Initialize the stats.""" + self.stats["hits"] = 0 + self.stats["total"] = 0 + + async def get(self, key: str) -> Any: # noqa: ANN401 + """Return the cached entry for key.""" + self.stats["total"] += 1 + if key in self.cache: + timestamp, value = self.cache[key] + if monotonic() - timestamp < self.ttl: + # checking the value is still valid + self.cache.move_to_end(key) + self.stats["hits"] += 1 + return value + # Time expired + del self.cache[key] + del self.locks[key] + return None + + async def set(self, key: str, value: Any) -> bool: # noqa: ANN401 + """Set the cached entry for key to value.""" + timestamp = monotonic() + if len(self.cache) > self.max_size: + self.cache.popitem(last=False) + self.cache[key] = timestamp, value + return True + + def clear(self) -> None: + """Empty the cache.""" + logger.debug("Clearing cache for device %s", self.device) + self.cache = OrderedDict() + self._init_stats() + + class AntaDevice(ABC): """Abstract class representing a device in ANTA. @@ -52,10 +112,11 @@ class AntaDevice(ABC): Hardware model of the device. tags : set[str] Tags for this device. - cache : Cache | None - In-memory cache from aiocache library for this device (None if cache is disabled). + cache : AntaCache | None + In-memory cache for this device (None if cache is disabled). cache_locks : dict Dictionary mapping keys to asyncio locks to guarantee exclusive access to the cache if not disabled. + Deprecated, will be removed in ANTA v2.0.0, use self.cache.locks instead. """ @@ -79,7 +140,8 @@ def __init__(self, name: str, tags: set[str] | None = None, *, disable_cache: bo self.tags.add(self.name) self.is_online: bool = False self.established: bool = False - self.cache: Cache | None = None + self.cache: AntaCache | None = None + # Keeping cache_locks for backward compatibility. self.cache_locks: defaultdict[str, asyncio.Lock] | None = None # Initialize cache if not disabled @@ -101,17 +163,16 @@ def __hash__(self) -> int: def _init_cache(self) -> None: """Initialize cache for the device, can be overridden by subclasses to manipulate how it works.""" - self.cache = Cache(cache_class=Cache.MEMORY, ttl=60, namespace=self.name, plugins=[HitMissRatioPlugin()]) - self.cache_locks = defaultdict(asyncio.Lock) + self.cache = AntaCache(device=self.name, ttl=60) + self.cache_locks = self.cache.locks @property def cache_statistics(self) -> dict[str, Any] | None: """Return the device cache statistics for logging purposes.""" - # Need to ignore pylint no-member as Cache is a proxy class and pylint is not smart enough - # https://github.com/pylint-dev/pylint/issues/7258 if self.cache is not None: - stats = getattr(self.cache, "hit_miss_ratio", {"total": 0, "hits": 0, "hit_ratio": 0}) - return {"total_commands_sent": stats["total"], "cache_hits": stats["hits"], "cache_hit_ratio": f"{stats['hit_ratio'] * 100:.2f}%"} + stats = self.cache.stats + ratio = stats["hits"] / stats["total"] if stats["total"] > 0 else 0 + return {"total_commands_sent": stats["total"], "cache_hits": stats["hits"], "cache_hit_ratio": f"{ratio * 100:.2f}%"} return None def __rich_repr__(self) -> Iterator[tuple[str, Any]]: @@ -177,18 +238,16 @@ async def collect(self, command: AntaCommand, *, collection_id: str | None = Non collection_id An identifier used to build the eAPI request ID. """ - # Need to ignore pylint no-member as Cache is a proxy class and pylint is not smart enough - # https://github.com/pylint-dev/pylint/issues/7258 - if self.cache is not None and self.cache_locks is not None and command.use_cache: - async with self.cache_locks[command.uid]: - cached_output = await self.cache.get(command.uid) # pylint: disable=no-member + if self.cache is not None and command.use_cache: + async with self.cache.locks[command.uid]: + cached_output = await self.cache.get(command.uid) if cached_output is not None: logger.debug("Cache hit for %s on %s", command.command, self.name) command.output = cached_output else: await self._collect(command=command, collection_id=collection_id) - await self.cache.set(command.uid, command.output) # pylint: disable=no-member + await self.cache.set(command.uid, command.output) else: await self._collect(command=command, collection_id=collection_id) diff --git a/anta/input_models/routing/bgp.py b/anta/input_models/routing/bgp.py index e51e0e11a..a34227a1b 100644 --- a/anta/input_models/routing/bgp.py +++ b/anta/input_models/routing/bgp.py @@ -6,7 +6,7 @@ from __future__ import annotations from ipaddress import IPv4Address, IPv4Network, IPv6Address -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal from warnings import warn from pydantic import BaseModel, ConfigDict, Field, PositiveInt, model_validator @@ -149,7 +149,7 @@ class BgpPeer(BaseModel): received_routes: list[IPv4Network] | None = None """List of received routes in CIDR format. Required field in the `VerifyBGPExchangedRoutes` test.""" capabilities: list[MultiProtocolCaps] | None = None - """List of BGP multiprotocol capabilities. Required field in the `VerifyBGPPeerMPCaps` test.""" + """List of BGP multiprotocol capabilities. Required field in the `VerifyBGPPeerMPCaps`, `VerifyBGPNlriAcceptance` tests.""" strict: bool = False """If True, requires exact match of the provided BGP multiprotocol capabilities. @@ -211,3 +211,46 @@ class VxlanEndpoint(BaseModel): def __str__(self) -> str: """Return a human-readable string representation of the VxlanEndpoint for reporting.""" return f"Address: {self.address} VNI: {self.vni}" + + +class BgpRoute(BaseModel): + """Model representing BGP routes. + + Only IPv4 prefixes are supported for now. + """ + + model_config = ConfigDict(extra="forbid") + prefix: IPv4Network + """The IPv4 network address.""" + vrf: str = "default" + """Optional VRF for the BGP peer. Defaults to `default`.""" + paths: list[BgpRoutePath] | None = None + """A list of paths for the BGP route. Required field in the `VerifyBGPRouteOrigin` test.""" + + def __str__(self) -> str: + """Return a human-readable string representation of the BgpRoute for reporting. + + Examples + -------- + - Prefix: 192.168.66.100/24 VRF: default + """ + return f"Prefix: {self.prefix} VRF: {self.vrf}" + + +class BgpRoutePath(BaseModel): + """Model representing a BGP route path.""" + + model_config = ConfigDict(extra="forbid") + nexthop: IPv4Address + """The next-hop IPv4 address for the path.""" + origin: Literal["Igp", "Egp", "Incomplete"] + """The BGP origin attribute of the route.""" + + def __str__(self) -> str: + """Return a human-readable string representation of the RoutePath for reporting. + + Examples + -------- + - Next-hop: 192.168.66.101 Origin: Igp + """ + return f"Next-hop: {self.nexthop} Origin: {self.origin}" diff --git a/anta/input_models/security.py b/anta/input_models/security.py index 5a517c575..79bdc17da 100644 --- a/anta/input_models/security.py +++ b/anta/input_models/security.py @@ -6,10 +6,20 @@ from __future__ import annotations from ipaddress import IPv4Address -from typing import Any +from typing import TYPE_CHECKING, Any, ClassVar, get_args from warnings import warn -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field, model_validator + +from anta.custom_types import EcdsaKeySize, EncryptionAlgorithm, RsaKeySize + +if TYPE_CHECKING: + import sys + + if sys.version_info >= (3, 11): + from typing import Self + else: + from typing_extensions import Self class IPSecPeer(BaseModel): @@ -43,6 +53,107 @@ class IPSecConn(BaseModel): """The IPv4 address of the destination in the security connection.""" +class APISSLCertificate(BaseModel): + """Model for an API SSL certificate.""" + + model_config = ConfigDict(extra="forbid") + certificate_name: str + """The name of the certificate to be verified.""" + expiry_threshold: int + """The expiry threshold of the certificate in days.""" + common_name: str + """The Common Name of the certificate.""" + encryption_algorithm: EncryptionAlgorithm + """The encryption algorithm used by the certificate.""" + key_size: RsaKeySize | EcdsaKeySize + """The key size (in bits) of the encryption algorithm.""" + + def __str__(self) -> str: + """Return a human-readable string representation of the APISSLCertificate for reporting. + + Examples + -------- + - Certificate: SIGNING_CA.crt + """ + return f"Certificate: {self.certificate_name}" + + @model_validator(mode="after") + def validate_inputs(self) -> Self: + """Validate the key size provided to the APISSLCertificates class. + + If encryption_algorithm is RSA then key_size should be in {2048, 3072, 4096}. + + If encryption_algorithm is ECDSA then key_size should be in {256, 384, 521}. + """ + if self.encryption_algorithm == "RSA" and self.key_size not in get_args(RsaKeySize): + msg = f"`{self.certificate_name}` key size {self.key_size} is invalid for RSA encryption. Allowed sizes are {get_args(RsaKeySize)}." + raise ValueError(msg) + + if self.encryption_algorithm == "ECDSA" and self.key_size not in get_args(EcdsaKeySize): + msg = f"`{self.certificate_name}` key size {self.key_size} is invalid for ECDSA encryption. Allowed sizes are {get_args(EcdsaKeySize)}." + raise ValueError(msg) + + return self + + +class ACLEntry(BaseModel): + """Model for an Access Control List (ACL) entry.""" + + model_config = ConfigDict(extra="forbid") + sequence: int = Field(ge=1, le=4294967295) + """Sequence number of the ACL entry, used to define the order of processing. Must be between 1 and 4294967295.""" + action: str + """Action of the ACL entry. Example: `deny ip any any`.""" + + def __str__(self) -> str: + """Return a human-readable string representation of the ACLEntry for reporting. + + Examples + -------- + - Sequence: 10 + """ + return f"Sequence: {self.sequence}" + + +class ACL(BaseModel): + """Model for an Access Control List (ACL).""" + + model_config = ConfigDict(extra="forbid") + name: str + """Name of the ACL.""" + entries: list[ACLEntry] + """List of the ACL entries.""" + IPv4ACLEntry: ClassVar[type[ACLEntry]] = ACLEntry + """To maintain backward compatibility.""" + + def __str__(self) -> str: + """Return a human-readable string representation of the ACL for reporting. + + Examples + -------- + - ACL name: Test + """ + return f"ACL name: {self.name}" + + +class IPv4ACL(ACL): # pragma: no cover + """Alias for the ACL model to maintain backward compatibility. + + When initialized, it will emit a deprecation warning and call the ACL model. + + TODO: Remove this class in ANTA v2.0.0. + """ + + def __init__(self, **data: Any) -> None: # noqa: ANN401 + """Initialize the IPv4ACL class, emitting a deprecation warning.""" + warn( + message="IPv4ACL model is deprecated and will be removed in ANTA v2.0.0. Use the ACL model instead.", + category=DeprecationWarning, + stacklevel=2, + ) + super().__init__(**data) + + class IPSecPeers(IPSecPeer): # pragma: no cover """Alias for the IPSecPeers model to maintain backward compatibility. @@ -52,7 +163,7 @@ class IPSecPeers(IPSecPeer): # pragma: no cover """ def __init__(self, **data: Any) -> None: # noqa: ANN401 - """Initialize the IPSecPeer class, emitting a deprecation warning.""" + """Initialize the IPSecPeers class, emitting a deprecation warning.""" warn( message="IPSecPeers model is deprecated and will be removed in ANTA v2.0.0. Use the IPSecPeer model instead.", category=DeprecationWarning, diff --git a/anta/models.py b/anta/models.py index 71f33ebd9..ce2482f0a 100644 --- a/anta/models.py +++ b/anta/models.py @@ -15,8 +15,7 @@ from pydantic import BaseModel, ConfigDict, ValidationError, create_model -from anta import GITHUB_SUGGESTION -from anta.constants import KNOWN_EOS_ERRORS +from anta.constants import KNOWN_EOS_ERRORS, UNSUPPORTED_PLATFORM_ERRORS from anta.custom_types import REGEXP_EOS_BLACKLIST_CMDS, Revision from anta.logger import anta_log_exception, exc_to_str from anta.result_manager.models import AntaTestStatus, TestResult @@ -258,7 +257,8 @@ def supported(self) -> bool: msg = f"Command '{self.command}' has not been collected and has not returned an error. Call AntaDevice.collect()." raise RuntimeError(msg) - return all("not supported on this hardware platform" not in e for e in self.errors) + + return not any(any(error in e for error in UNSUPPORTED_PLATFORM_ERRORS) for e in self.errors) @property def returned_known_eos_error(self) -> bool: @@ -683,8 +683,6 @@ def _handle_failed_commands(self) -> None: cmds = self.failed_commands unsupported_commands = [f"'{c.command}' is not supported on {self.device.hw_model}" for c in cmds if not c.supported] if unsupported_commands: - msg = f"Test {self.name} has been skipped because it is not supported on {self.device.hw_model}: {GITHUB_SUGGESTION}" - self.logger.warning(msg) self.result.is_skipped("\n".join(unsupported_commands)) return returned_known_eos_error = [f"'{c.command}' failed on {self.device.name}: {', '.join(c.errors)}" for c in cmds if c.returned_known_eos_error] diff --git a/anta/tests/profiles.py b/anta/tests/profiles.py index f9c3c2622..9f78a1702 100644 --- a/anta/tests/profiles.py +++ b/anta/tests/profiles.py @@ -51,7 +51,7 @@ def test(self) -> None: if command_output["uftMode"] == str(self.inputs.mode): self.result.is_success() else: - self.result.is_failure(f"Device is not running correct UFT mode (expected: {self.inputs.mode} / running: {command_output['uftMode']})") + self.result.is_failure(f"Not running the correct UFT mode - Expected: {self.inputs.mode}, Actual: {command_output['uftMode']}") class VerifyTcamProfile(AntaTest): diff --git a/anta/tests/routing/bgp.py b/anta/tests/routing/bgp.py index 89ad386ac..1b85b5893 100644 --- a/anta/tests/routing/bgp.py +++ b/anta/tests/routing/bgp.py @@ -11,7 +11,7 @@ from pydantic import field_validator -from anta.input_models.routing.bgp import BgpAddressFamily, BgpAfi, BgpNeighbor, BgpPeer, VxlanEndpoint +from anta.input_models.routing.bgp import BgpAddressFamily, BgpAfi, BgpNeighbor, BgpPeer, BgpRoute, VxlanEndpoint from anta.models import AntaCommand, AntaTemplate, AntaTest from anta.tools import format_data, get_item, get_value @@ -1544,3 +1544,144 @@ def test(self) -> None: outq = peer["peerTcpInfo"]["outputQueueLength"] if self.inputs.check_tcp_queues and (inq != 0 or outq != 0): self.result.is_failure(f"Peer: {peer['peerAddress']} VRF: {vrf} - Session has non-empty message queues - InQ: {inq}, OutQ: {outq}") + + +class VerifyBGPNlriAcceptance(AntaTest): + """Verifies that all received NLRI are accepted for all AFI/SAFI configured for BGP IPv4 peer(s). + + This test performs the following checks for each specified peer: + + 1. Verifies that the peer is found in its VRF in the BGP configuration. + 2. Verifies that all received NLRI were accepted by comparing `nlrisReceived` with `nlrisAccepted`. + + Expected Results + ---------------- + * Success: If `nlrisReceived` equals `nlrisAccepted`, indicating all NLRI were accepted. + * Failure: If any of the following occur: + - The specified VRF is not configured. + - `nlrisReceived` does not equal `nlrisAccepted`, indicating some NLRI were rejected or filtered. + + Examples + -------- + ```yaml + anta.tests.routing: + bgp: + - VerifyBGPNlriAcceptance: + bgp_peers: + - peer_address: 10.100.0.128 + vrf: default + capabilities: + - ipv4Unicast + ``` + """ + + categories: ClassVar[list[str]] = ["bgp"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bgp summary vrf all", revision=1)] + + class Input(AntaTest.Input): + """Input model for the VerifyBGPNlriAcceptance test.""" + + bgp_peers: list[BgpPeer] + """List of BGP IPv4 peers.""" + + @field_validator("bgp_peers") + @classmethod + def validate_bgp_peers(cls, bgp_peers: list[T]) -> list[T]: + """Validate that 'capabilities' field is provided in each BGP peer.""" + for peer in bgp_peers: + if peer.capabilities is None: + msg = f"{peer} 'capabilities' field missing in the input" + raise ValueError(msg) + return bgp_peers + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyBGPNlriAcceptance.""" + self.result.is_success() + + output = self.instance_commands[0].json_output + + for peer in self.inputs.bgp_peers: + # Check if the peer is found + if not (peer_data := get_value(output, f"vrfs..{peer.vrf}..peers..{peer.peer_address}", separator="..")): + self.result.is_failure(f"{peer} - Not found") + continue + + # Fetching the multiprotocol capabilities + for capability in peer.capabilities: + # Check if the capability is found + if (capability_status := get_value(peer_data, capability)) is None: + self.result.is_failure(f"{peer} - {capability} not found") + continue + + if capability_status["afiSafiState"] != "negotiated": + self.result.is_failure(f"{peer} - {capability} not negotiated") + + if (received := capability_status.get("nlrisReceived")) != (accepted := capability_status.get("nlrisAccepted")): + self.result.is_failure(f"{peer} AFI/SAFI: {capability} - some NLRI were filtered or rejected - Accepted: {accepted} Received: {received}") + + +class VerifyBGPRoutePaths(AntaTest): + """Verifies BGP IPv4 route paths. + + This test performs the following checks for each specified BGP route entry: + 1. Verifies the specified BGP route exists in the routing table. + 2. For each expected paths: + - Verifies a path with matching next-hop exists. + - Verifies the path's origin attribute matches the expected value. + + Expected Results + ---------------- + * Success: The test will pass if all specified routes exist with paths matching the expected next-hops and origin attributes. + * Failure: The test will fail if: + - A specified BGP route is not found. + - A path with specified next-hop is not found. + - A path's origin attribute doesn't match the expected value. + + Examples + -------- + ```yaml + anta.tests.routing: + bgp: + - VerifyBGPRoutePaths: + route_entries: + - prefix: 10.100.0.128/31 + vrf: default + paths: + - nexthop: 10.100.0.10 + origin: Igp + - nexthop: 10.100.4.5 + origin: Incomplete + ``` + """ + + categories: ClassVar[list[str]] = ["bgp"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ip bgp vrf all", revision=3)] + + class Input(AntaTest.Input): + """Input model for the VerifyBGPRoutePaths test.""" + + route_entries: list[BgpRoute] + """List of BGP IPv4 route(s).""" + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyBGPRoutePaths.""" + self.result.is_success() + + for route in self.inputs.route_entries: + # Verify if the prefix exists in BGP table + if not (bgp_routes := get_value(self.instance_commands[0].json_output, f"vrfs..{route.vrf}..bgpRouteEntries..{route.prefix}", separator="..")): + self.result.is_failure(f"{route} - prefix not found") + continue + + # Iterating over each path. + for path in route.paths: + nexthop = str(path.nexthop) + origin = path.origin + if not (route_path := get_item(bgp_routes["bgpRoutePaths"], "nextHop", nexthop)): + self.result.is_failure(f"{route} {path} - path not found") + continue + + if (actual_origin := get_value(route_path, "routeType.origin")) != origin: + self.result.is_failure(f"{route} {path} - Origin mismatch - Actual: {actual_origin}") diff --git a/anta/tests/security.py b/anta/tests/security.py index de78ea18c..78908050b 100644 --- a/anta/tests/security.py +++ b/anta/tests/security.py @@ -8,22 +8,12 @@ # Mypy does not understand AntaTest.Input typing # mypy: disable-error-code=attr-defined from datetime import datetime, timezone -from typing import TYPE_CHECKING, ClassVar, get_args +from typing import ClassVar -from pydantic import BaseModel, Field, model_validator - -from anta.custom_types import EcdsaKeySize, EncryptionAlgorithm, PositiveInteger, RsaKeySize -from anta.input_models.security import IPSecPeer, IPSecPeers +from anta.custom_types import PositiveInteger +from anta.input_models.security import ACL, APISSLCertificate, IPSecPeer, IPSecPeers from anta.models import AntaCommand, AntaTemplate, AntaTest -from anta.tools import get_failed_logs, get_item, get_value - -if TYPE_CHECKING: - import sys - - if sys.version_info >= (3, 11): - from typing import Self - else: - from typing_extensions import Self +from anta.tools import get_item, get_value class VerifySSHStatus(AntaTest): @@ -354,14 +344,27 @@ def test(self) -> None: class VerifyAPISSLCertificate(AntaTest): - """Verifies the eAPI SSL certificate expiry, common subject name, encryption algorithm and key size. + """Verifies the eAPI SSL certificate. + + This test performs the following checks for each certificate: + + 1. Validates that the certificate is not expired and meets the configured expiry threshold. + 2. Validates that the certificate Common Name matches the expected one. + 3. Ensures the certificate uses the specified encryption algorithm. + 4. Verifies the certificate key matches the expected key size. Expected Results ---------------- - * Success: The test will pass if the certificate's expiry date is greater than the threshold, - and the certificate has the correct name, encryption algorithm, and key size. - * Failure: The test will fail if the certificate is expired or is going to expire, - or if the certificate has an incorrect name, encryption algorithm, or key size. + * Success: If all of the following occur: + - The certificate's expiry date exceeds the configured threshold. + - The certificate's Common Name matches the input configuration. + - The encryption algorithm used by the certificate is as expected. + - The key size of the certificate matches the input configuration. + * Failure: If any of the following occur: + - The certificate is expired or set to expire within the defined threshold. + - The certificate's common name does not match the expected input. + - The encryption algorithm is incorrect. + - The key size does not match the expected input. Examples -------- @@ -393,38 +396,7 @@ class Input(AntaTest.Input): certificates: list[APISSLCertificate] """List of API SSL certificates.""" - - class APISSLCertificate(BaseModel): - """Model for an API SSL certificate.""" - - certificate_name: str - """The name of the certificate to be verified.""" - expiry_threshold: int - """The expiry threshold of the certificate in days.""" - common_name: str - """The common subject name of the certificate.""" - encryption_algorithm: EncryptionAlgorithm - """The encryption algorithm of the certificate.""" - key_size: RsaKeySize | EcdsaKeySize - """The encryption algorithm key size of the certificate.""" - - @model_validator(mode="after") - def validate_inputs(self) -> Self: - """Validate the key size provided to the APISSLCertificates class. - - If encryption_algorithm is RSA then key_size should be in {2048, 3072, 4096}. - - If encryption_algorithm is ECDSA then key_size should be in {256, 384, 521}. - """ - if self.encryption_algorithm == "RSA" and self.key_size not in get_args(RsaKeySize): - msg = f"`{self.certificate_name}` key size {self.key_size} is invalid for RSA encryption. Allowed sizes are {get_args(RsaKeySize)}." - raise ValueError(msg) - - if self.encryption_algorithm == "ECDSA" and self.key_size not in get_args(EcdsaKeySize): - msg = f"`{self.certificate_name}` key size {self.key_size} is invalid for ECDSA encryption. Allowed sizes are {get_args(EcdsaKeySize)}." - raise ValueError(msg) - - return self + APISSLCertificate: ClassVar[type[APISSLCertificate]] = APISSLCertificate @AntaTest.anta_test def test(self) -> None: @@ -442,7 +414,7 @@ def test(self) -> None: # Collecting certificate expiry time and current EOS time. # These times are used to calculate the number of days until the certificate expires. if not (certificate_data := get_value(certificate_output, f"certificates..{certificate.certificate_name}", separator="..")): - self.result.is_failure(f"SSL certificate '{certificate.certificate_name}', is not configured.\n") + self.result.is_failure(f"{certificate} - Not found") continue expiry_time = certificate_data["notAfter"] @@ -450,24 +422,25 @@ def test(self) -> None: # Verify certificate expiry if 0 < day_difference < certificate.expiry_threshold: - self.result.is_failure(f"SSL certificate `{certificate.certificate_name}` is about to expire in {day_difference} days.\n") + self.result.is_failure( + f"{certificate} - set to expire within the threshold - Threshold: {certificate.expiry_threshold} days Actual: {day_difference} days" + ) elif day_difference < 0: - self.result.is_failure(f"SSL certificate `{certificate.certificate_name}` is expired.\n") + self.result.is_failure(f"{certificate} - certificate expired") # Verify certificate common subject name, encryption algorithm and key size - keys_to_verify = ["subject.commonName", "publicKey.encryptionAlgorithm", "publicKey.size"] - actual_certificate_details = {key: get_value(certificate_data, key) for key in keys_to_verify} + common_name = get_value(certificate_data, "subject.commonName", default="Not found") + encryp_algo = get_value(certificate_data, "publicKey.encryptionAlgorithm", default="Not found") + key_size = get_value(certificate_data, "publicKey.size", default="Not found") - expected_certificate_details = { - "subject.commonName": certificate.common_name, - "publicKey.encryptionAlgorithm": certificate.encryption_algorithm, - "publicKey.size": certificate.key_size, - } + if common_name != certificate.common_name: + self.result.is_failure(f"{certificate} - incorrect common name - Expected: {certificate.common_name} Actual: {common_name}") + + if encryp_algo != certificate.encryption_algorithm: + self.result.is_failure(f"{certificate} - incorrect encryption algorithm - Expected: {certificate.encryption_algorithm} Actual: {encryp_algo}") - if actual_certificate_details != expected_certificate_details: - failed_log = f"SSL certificate `{certificate.certificate_name}` is not configured properly:" - failed_log += get_failed_logs(expected_certificate_details, actual_certificate_details) - self.result.is_failure(f"{failed_log}\n") + if key_size != certificate.key_size: + self.result.is_failure(f"{certificate} - incorrect public key - Expected: {certificate.key_size} Actual: {key_size}") class VerifyBannerLogin(AntaTest): @@ -555,12 +528,22 @@ def test(self) -> None: class VerifyIPv4ACL(AntaTest): - """Verifies the configuration of IPv4 ACLs. + """Verifies the IPv4 ACLs. + + This test performs the following checks for each IPv4 ACL: + + 1. Validates that the IPv4 ACL is properly configured. + 2. Validates that the sequence entries in the ACL are correctly ordered. Expected Results ---------------- - * Success: The test will pass if an IPv4 ACL is configured with the correct sequence entries. - * Failure: The test will fail if an IPv4 ACL is not configured or entries are not in sequence. + * Success: If all of the following occur: + - Any IPv4 ACL entry is not configured. + - The sequency entries are correctly configured. + * Failure: If any of the following occur: + - The IPv4 ACL is not configured. + - The any IPv4 ACL entry is not configured. + - The action for any entry does not match the expected input. Examples -------- @@ -586,65 +569,37 @@ class VerifyIPv4ACL(AntaTest): """ categories: ClassVar[list[str]] = ["security"] - commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaTemplate(template="show ip access-lists {acl}", revision=1)] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ip access-lists", revision=1)] class Input(AntaTest.Input): """Input model for the VerifyIPv4ACL test.""" - ipv4_access_lists: list[IPv4ACL] + ipv4_access_lists: list[ACL] """List of IPv4 ACLs to verify.""" - - class IPv4ACL(BaseModel): - """Model for an IPv4 ACL.""" - - name: str - """Name of IPv4 ACL.""" - - entries: list[IPv4ACLEntry] - """List of IPv4 ACL entries.""" - - class IPv4ACLEntry(BaseModel): - """Model for an IPv4 ACL entry.""" - - sequence: int = Field(ge=1, le=4294967295) - """Sequence number of an ACL entry.""" - action: str - """Action of an ACL entry.""" - - def render(self, template: AntaTemplate) -> list[AntaCommand]: - """Render the template for each input ACL.""" - return [template.render(acl=acl.name) for acl in self.inputs.ipv4_access_lists] + IPv4ACL: ClassVar[type[ACL]] = ACL + """To maintain backward compatibility.""" @AntaTest.anta_test def test(self) -> None: """Main test function for VerifyIPv4ACL.""" self.result.is_success() - for command_output, acl in zip(self.instance_commands, self.inputs.ipv4_access_lists): - # Collecting input ACL details - acl_name = command_output.params.acl - # Retrieve the expected entries from the inputs - acl_entries = acl.entries - - # Check if ACL is configured - ipv4_acl_list = command_output.json_output["aclList"] - if not ipv4_acl_list: - self.result.is_failure(f"{acl_name}: Not found") + + if not (command_output := self.instance_commands[0].json_output["aclList"]): + self.result.is_failure("No Access Control List (ACL) configured") + return + + for access_list in self.inputs.ipv4_access_lists: + if not (access_list_output := get_item(command_output, "name", access_list.name)): + self.result.is_failure(f"{access_list} - Not configured") continue - # Check if the sequence number is configured and has the correct action applied - failed_log = f"{acl_name}:\n" - for acl_entry in acl_entries: - acl_seq = acl_entry.sequence - acl_action = acl_entry.action - if (actual_entry := get_item(ipv4_acl_list[0]["sequence"], "sequenceNumber", acl_seq)) is None: - failed_log += f"Sequence number `{acl_seq}` is not found.\n" + for entry in access_list.entries: + if not (actual_entry := get_item(access_list_output["sequence"], "sequenceNumber", entry.sequence)): + self.result.is_failure(f"{access_list} {entry} - Not configured") continue - if actual_entry["text"] != acl_action: - failed_log += f"Expected `{acl_action}` as sequence number {acl_seq} action but found `{actual_entry['text']}` instead.\n" - - if failed_log != f"{acl_name}:\n": - self.result.is_failure(f"{failed_log}") + if (act_action := actual_entry["text"]) != entry.action: + self.result.is_failure(f"{access_list} {entry} - action mismatch - Expected: {entry.action} Actual: {act_action}") class VerifyIPSecConnHealth(AntaTest): diff --git a/docs/advanced_usages/caching.md b/docs/advanced_usages/caching.md index a9c18182c..1e794ea56 100644 --- a/docs/advanced_usages/caching.md +++ b/docs/advanced_usages/caching.md @@ -8,21 +8,8 @@ ANTA is a streamlined Python framework designed for efficient interaction with n ## Configuration -By default, ANTA utilizes [aiocache](https://github.com/aio-libs/aiocache)'s memory cache backend, also called [`SimpleMemoryCache`](https://aiocache.aio-libs.org/en/v0.12.2/caches.html#simplememorycache). This library aims for simplicity and supports asynchronous operations to go along with Python `asyncio` used in ANTA. - The `_init_cache()` method of the [AntaDevice](../api/device.md#anta.device.AntaDevice) abstract class initializes the cache. Child classes can override this method to tweak the cache configuration: -```python -def _init_cache(self) -> None: - """ - Initialize cache for the device, can be overridden by subclasses to manipulate how it works - """ - self.cache = Cache(cache_class=Cache.MEMORY, ttl=60, namespace=self.name, plugins=[HitMissRatioPlugin()]) - self.cache_locks = defaultdict(asyncio.Lock) -``` - -The cache is also configured with `aiocache`'s [`HitMissRatioPlugin`](https://aiocache.aio-libs.org/en/v0.12.2/plugins.html#hitmissratioplugin) plugin to calculate the ratio of hits the cache has and give useful statistics for logging purposes in ANTA. - ## Cache key design The cache is initialized per `AntaDevice` and uses the following cache key design: @@ -31,7 +18,7 @@ The cache is initialized per `AntaDevice` and uses the following cache key desig The `uid` is an attribute of [AntaCommand](../api/models.md#anta.models.AntaCommand), which is a unique identifier generated from the command, version, revision and output format. -Each UID has its own asyncio lock. This design allows coroutines that need to access the cache for different UIDs to do so concurrently. The locks are managed by the `self.cache_locks` dictionary. +Each UID has its own asyncio lock. This design allows coroutines that need to access the cache for different UIDs to do so concurrently. The locks are managed by the `AntaCache.locks` dictionary. ## Mechanisms @@ -45,35 +32,35 @@ There might be scenarios where caching is not wanted. You can disable caching in 1. Caching can be disabled globally, for **ALL** commands on **ALL** devices, using the `--disable-cache` global flag when invoking anta at the [CLI](../cli/overview.md#invoking-anta-cli): - ```bash - anta --disable-cache --username arista --password arista nrfu table - ``` + ```bash + anta --disable-cache --username arista --password arista nrfu table + ``` 2. Caching can be disabled per device, network or range by setting the `disable_cache` key to `True` when defining the ANTA [Inventory](../usage-inventory-catalog.md#device-inventory) file: - ```yaml - anta_inventory: - hosts: - - host: 172.20.20.101 - name: DC1-SPINE1 - tags: ["SPINE", "DC1"] - disable_cache: True # Set this key to True - - host: 172.20.20.102 - name: DC1-SPINE2 - tags: ["SPINE", "DC1"] - disable_cache: False # Optional since it's the default - - networks: - - network: "172.21.21.0/24" - disable_cache: True - - ranges: - - start: 172.22.22.10 - end: 172.22.22.19 - disable_cache: True - ``` - - This approach effectively disables caching for **ALL** commands sent to devices targeted by the `disable_cache` key. + ```yaml + anta_inventory: + hosts: + - host: 172.20.20.101 + name: DC1-SPINE1 + tags: ["SPINE", "DC1"] + disable_cache: True # Set this key to True + - host: 172.20.20.102 + name: DC1-SPINE2 + tags: ["SPINE", "DC1"] + disable_cache: False # Optional since it's the default + + networks: + - network: "172.21.21.0/24" + disable_cache: True + + ranges: + - start: 172.22.22.10 + end: 172.22.22.19 + disable_cache: True + ``` + + This approach effectively disables caching for **ALL** commands sent to devices targeted by the `disable_cache` key. 3. For tests developers, caching can be disabled for a specific [`AntaCommand`](../api/models.md#anta.models.AntaCommand) or [`AntaTemplate`](../api/models.md#anta.models.AntaTemplate) by setting the `use_cache` attribute to `False`. That means the command output will always be collected on the device and therefore, never use caching. diff --git a/examples/tests.yaml b/examples/tests.yaml index 973a1e010..cdbb3ad12 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -391,6 +391,13 @@ anta.tests.routing.bgp: - 192.0.254.5/32 received_routes: - 192.0.254.3/32 + - VerifyBGPNlriAcceptance: + # Verifies that all received NLRI are accepted for all AFI/SAFI configured for BGP IPv4 peer(s). + bgp_peers: + - peer_address: 10.100.0.128 + vrf: default + capabilities: + - ipv4Unicast - VerifyBGPPeerASNCap: # Verifies the four octet ASN capability of BGP IPv4 peer(s). bgp_peers: @@ -499,6 +506,16 @@ anta.tests.routing.bgp: - VerifyBGPPeersHealthRibd: # Verifies the health of all the BGP IPv4 peer(s). check_tcp_queues: True + - VerifyBGPRoutePaths: + # Verifies BGP IPv4 route paths. + route_entries: + - prefix: 10.100.0.128/31 + vrf: default + paths: + - nexthop: 10.100.0.10 + origin: Igp + - nexthop: 10.100.4.5 + origin: Incomplete - VerifyBGPSpecificPeers: # Verifies the health of specific BGP peer(s) for given address families. address_families: @@ -646,7 +663,7 @@ anta.tests.security: number: 3 vrf: default - VerifyAPISSLCertificate: - # Verifies the eAPI SSL certificate expiry, common subject name, encryption algorithm and key size. + # Verifies the eAPI SSL certificate. certificates: - certificate_name: ARISTA_SIGNING_CA.crt expiry_threshold: 30 @@ -675,7 +692,7 @@ anta.tests.security: - VerifyIPSecConnHealth: # Verifies all IPv4 security connections. - VerifyIPv4ACL: - # Verifies the configuration of IPv4 ACLs. + # Verifies the IPv4 ACLs. ipv4_access_lists: - name: default-control-plane-acl entries: diff --git a/pyproject.toml b/pyproject.toml index 09fee8c3e..6d47f6897 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,6 @@ maintainers = [ description = "Arista Network Test Automation (ANTA) Framework" license = { file = "LICENSE" } dependencies = [ - "aiocache>=0.12.2", "asyncssh>=2.16", "cvprac>=1.3.1", "eval-type-backport>=0.1.3", # Support newer typing features in older Python versions (required until Python 3.9 support is removed) @@ -143,7 +142,7 @@ plugins = [ ] # Comment below for better type checking #follow_imports = "skip" -# Make it false if we implement stubs using stubgen from mypy for aio-eapi, aiocache and cvprac +# Make it false if we implement stubs using stubgen from mypy for asynceapi, cvprac # and configure mypy_path to generated stubs e.g.: mypy_path = "./out" ignore_missing_imports = true warn_redundant_casts = true diff --git a/tests/units/anta_tests/routing/test_bgp.py b/tests/units/anta_tests/routing/test_bgp.py index c0de32932..21758ea78 100644 --- a/tests/units/anta_tests/routing/test_bgp.py +++ b/tests/units/anta_tests/routing/test_bgp.py @@ -14,6 +14,7 @@ from anta.tests.routing.bgp import ( VerifyBGPAdvCommunities, VerifyBGPExchangedRoutes, + VerifyBGPNlriAcceptance, VerifyBGPPeerASNCap, VerifyBGPPeerCount, VerifyBGPPeerDropStats, @@ -28,6 +29,7 @@ VerifyBGPPeersHealthRibd, VerifyBGPPeerUpdateErrors, VerifyBgpRouteMaps, + VerifyBGPRoutePaths, VerifyBGPSpecificPeers, VerifyBGPTimers, VerifyEVPNType2Route, @@ -4609,4 +4611,493 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo ], }, }, + { + "name": "success", + "test": VerifyBGPNlriAcceptance, + "eos_data": [ + { + "vrfs": { + "default": { + "vrf": "default", + "routerId": "10.100.1.5", + "asn": "65102", + "peers": { + "10.100.0.8": { + "peerState": "Established", + "peerAsn": "65100", + "ipv4Unicast": {"afiSafiState": "negotiated", "nlrisReceived": 17, "nlrisAccepted": 17}, + "l2VpnEvpn": {"afiSafiState": "negotiated", "nlrisReceived": 56, "nlrisAccepted": 56}, + }, + }, + }, + "MGMT": { + "vrf": "MGMT", + "routerId": "10.100.1.5", + "asn": "65102", + "peers": { + "10.100.4.5": { + "peerState": "Established", + "peerAsn": "65102", + "ipv4Unicast": {"afiSafiState": "negotiated", "nlrisReceived": 14, "nlrisAccepted": 14}, + "l2VpnEvpn": {"afiSafiState": "negotiated", "nlrisReceived": 56, "nlrisAccepted": 56}, + } + }, + }, + } + } + ], + "inputs": { + "bgp_peers": [ + { + "peer_address": "10.100.0.8", + "vrf": "default", + "capabilities": ["Ipv4 Unicast", "L2vpnEVPN"], + }, + { + "peer_address": "10.100.4.5", + "vrf": "MGMT", + "capabilities": ["ipv4 Unicast", "L2vpnEVPN"], + }, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure-vrf-not-configured", + "test": VerifyBGPNlriAcceptance, + "eos_data": [ + { + "vrfs": { + "default": { + "vrf": "default", + "routerId": "10.100.1.5", + "asn": "65102", + "peers": {}, + }, + "MGMT": { + "vrf": "MGMT", + "routerId": "10.100.1.5", + "asn": "65102", + "peers": {}, + }, + } + } + ], + "inputs": { + "bgp_peers": [ + { + "peer_address": "10.100.0.8", + "vrf": "default", + "capabilities": ["Ipv4 Unicast", "L2vpnEVPN"], + }, + { + "peer_address": "10.100.4.5", + "vrf": "MGMT", + "capabilities": ["ipv4 Unicast", "L2vpnEVPN"], + }, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "Peer: 10.100.0.8 VRF: default - Not found", + "Peer: 10.100.4.5 VRF: MGMT - Not found", + ], + }, + }, + { + "name": "failure-capability-not-found", + "test": VerifyBGPNlriAcceptance, + "eos_data": [ + { + "vrfs": { + "default": { + "vrf": "default", + "routerId": "10.100.1.5", + "asn": "65102", + "peers": { + "10.100.0.8": { + "peerState": "Established", + "peerAsn": "65100", + "ipv4Unicast": {"afiSafiState": "negotiated", "nlrisReceived": 17, "nlrisAccepted": 17}, + }, + }, + }, + "MGMT": { + "vrf": "MGMT", + "routerId": "10.100.1.5", + "asn": "65102", + "peers": { + "10.100.4.5": { + "peerState": "Established", + "peerAsn": "65102", + "l2VpnEvpn": {"afiSafiState": "negotiated", "nlrisReceived": 56, "nlrisAccepted": 56}, + } + }, + }, + } + } + ], + "inputs": { + "bgp_peers": [ + { + "peer_address": "10.100.0.8", + "vrf": "default", + "capabilities": ["Ipv4 Unicast", "L2vpnEVPN"], + }, + { + "peer_address": "10.100.4.5", + "vrf": "MGMT", + "capabilities": ["ipv4 Unicast", "L2vpnEVPN"], + }, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "Peer: 10.100.0.8 VRF: default - l2VpnEvpn not found", + "Peer: 10.100.4.5 VRF: MGMT - ipv4Unicast not found", + ], + }, + }, + { + "name": "failure-capability-not-negotiated", + "test": VerifyBGPNlriAcceptance, + "eos_data": [ + { + "vrfs": { + "default": { + "vrf": "default", + "routerId": "10.100.1.5", + "asn": "65102", + "peers": { + "10.100.0.8": { + "peerState": "Established", + "peerAsn": "65100", + "ipv4Unicast": {"afiSafiState": "configured", "nlrisReceived": 17, "nlrisAccepted": 17}, + }, + }, + }, + "MGMT": { + "vrf": "MGMT", + "routerId": "10.100.1.5", + "asn": "65102", + "peers": { + "10.100.4.5": { + "peerState": "Established", + "peerAsn": "65102", + "l2VpnEvpn": {"afiSafiState": "configured", "nlrisReceived": 56, "nlrisAccepted": 56}, + } + }, + }, + } + } + ], + "inputs": { + "bgp_peers": [ + { + "peer_address": "10.100.0.8", + "vrf": "default", + "capabilities": ["Ipv4 Unicast", "L2vpnEVPN"], + }, + { + "peer_address": "10.100.4.5", + "vrf": "MGMT", + "capabilities": ["ipv4 Unicast", "L2vpnEVPN"], + }, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "Peer: 10.100.0.8 VRF: default - ipv4Unicast not negotiated", + "Peer: 10.100.0.8 VRF: default - l2VpnEvpn not found", + "Peer: 10.100.4.5 VRF: MGMT - ipv4Unicast not found", + "Peer: 10.100.4.5 VRF: MGMT - l2VpnEvpn not negotiated", + ], + }, + }, + { + "name": "failure-nlris-not-accepted", + "test": VerifyBGPNlriAcceptance, + "eos_data": [ + { + "vrfs": { + "default": { + "vrf": "default", + "routerId": "10.100.1.5", + "asn": "65102", + "peers": { + "10.100.0.8": { + "peerState": "Established", + "peerAsn": "65100", + "ipv4Unicast": {"afiSafiState": "negotiated", "nlrisReceived": 17, "nlrisAccepted": 16}, + "l2VpnEvpn": {"afiSafiState": "negotiated", "nlrisReceived": 58, "nlrisAccepted": 56}, + }, + }, + }, + "MGMT": { + "vrf": "MGMT", + "routerId": "10.100.1.5", + "asn": "65102", + "peers": { + "10.100.4.5": { + "peerState": "Established", + "peerAsn": "65102", + "ipv4Unicast": {"afiSafiState": "negotiated", "nlrisReceived": 15, "nlrisAccepted": 14}, + "l2VpnEvpn": {"afiSafiState": "negotiated", "nlrisReceived": 59, "nlrisAccepted": 56}, + } + }, + }, + } + } + ], + "inputs": { + "bgp_peers": [ + { + "peer_address": "10.100.0.8", + "vrf": "default", + "capabilities": ["Ipv4 Unicast", "L2vpnEVPN"], + }, + { + "peer_address": "10.100.4.5", + "vrf": "MGMT", + "capabilities": ["ipv4 Unicast", "L2vpnEVPN"], + }, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "Peer: 10.100.0.8 VRF: default AFI/SAFI: ipv4Unicast - some NLRI were filtered or rejected - Accepted: 16 Received: 17", + "Peer: 10.100.0.8 VRF: default AFI/SAFI: l2VpnEvpn - some NLRI were filtered or rejected - Accepted: 56 Received: 58", + "Peer: 10.100.4.5 VRF: MGMT AFI/SAFI: ipv4Unicast - some NLRI were filtered or rejected - Accepted: 14 Received: 15", + "Peer: 10.100.4.5 VRF: MGMT AFI/SAFI: l2VpnEvpn - some NLRI were filtered or rejected - Accepted: 56 Received: 59", + ], + }, + }, + { + "name": "success", + "test": VerifyBGPRoutePaths, + "eos_data": [ + { + "vrfs": { + "default": { + "bgpRouteEntries": { + "10.100.0.128/31": { + "bgpRoutePaths": [ + { + "nextHop": "10.100.0.10", + "routeType": { + "origin": "Igp", + }, + }, + { + "nextHop": "10.100.4.5", + "routeType": { + "origin": "Incomplete", + }, + }, + ], + } + } + }, + "MGMT": { + "bgpRouteEntries": { + "10.100.0.130/31": { + "bgpRoutePaths": [ + { + "nextHop": "10.100.0.8", + "routeType": { + "origin": "Igp", + }, + }, + { + "nextHop": "10.100.0.10", + "routeType": { + "origin": "Igp", + }, + }, + ], + } + } + }, + } + }, + ], + "inputs": { + "route_entries": [ + { + "prefix": "10.100.0.128/31", + "vrf": "default", + "paths": [{"nexthop": "10.100.0.10", "origin": "Igp"}, {"nexthop": "10.100.4.5", "origin": "Incomplete"}], + }, + { + "prefix": "10.100.0.130/31", + "vrf": "MGMT", + "paths": [{"nexthop": "10.100.0.8", "origin": "Igp"}, {"nexthop": "10.100.0.10", "origin": "Igp"}], + }, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure-origin-not-correct", + "test": VerifyBGPRoutePaths, + "eos_data": [ + { + "vrfs": { + "default": { + "bgpRouteEntries": { + "10.100.0.128/31": { + "bgpRoutePaths": [ + { + "nextHop": "10.100.0.10", + "routeType": { + "origin": "Igp", + }, + }, + { + "nextHop": "10.100.4.5", + "routeType": { + "origin": "Incomplete", + }, + }, + ], + } + } + }, + "MGMT": { + "bgpRouteEntries": { + "10.100.0.130/31": { + "bgpRoutePaths": [ + { + "nextHop": "10.100.0.8", + "routeType": { + "origin": "Igp", + }, + }, + { + "nextHop": "10.100.0.10", + "routeType": { + "origin": "Igp", + }, + }, + ], + } + } + }, + } + }, + ], + "inputs": { + "route_entries": [ + { + "prefix": "10.100.0.128/31", + "vrf": "default", + "paths": [{"nexthop": "10.100.0.10", "origin": "Incomplete"}, {"nexthop": "10.100.4.5", "origin": "Igp"}], + }, + { + "prefix": "10.100.0.130/31", + "vrf": "MGMT", + "paths": [{"nexthop": "10.100.0.8", "origin": "Incomplete"}, {"nexthop": "10.100.0.10", "origin": "Incomplete"}], + }, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "Prefix: 10.100.0.128/31 VRF: default Next-hop: 10.100.0.10 Origin: Incomplete - Origin mismatch - Actual: Igp", + "Prefix: 10.100.0.128/31 VRF: default Next-hop: 10.100.4.5 Origin: Igp - Origin mismatch - Actual: Incomplete", + "Prefix: 10.100.0.130/31 VRF: MGMT Next-hop: 10.100.0.8 Origin: Incomplete - Origin mismatch - Actual: Igp", + "Prefix: 10.100.0.130/31 VRF: MGMT Next-hop: 10.100.0.10 Origin: Incomplete - Origin mismatch - Actual: Igp", + ], + }, + }, + { + "name": "failure-path-not-found", + "test": VerifyBGPRoutePaths, + "eos_data": [ + { + "vrfs": { + "default": { + "bgpRouteEntries": { + "10.100.0.128/31": { + "bgpRoutePaths": [ + { + "nextHop": "10.100.0.15", + "routeType": { + "origin": "Igp", + }, + }, + ], + } + } + }, + "MGMT": { + "bgpRouteEntries": { + "10.100.0.130/31": { + "bgpRoutePaths": [ + { + "nextHop": "10.100.0.15", + "routeType": { + "origin": "Igp", + }, + }, + ], + } + } + }, + } + }, + ], + "inputs": { + "route_entries": [ + { + "prefix": "10.100.0.128/31", + "vrf": "default", + "paths": [{"nexthop": "10.100.0.10", "origin": "Incomplete"}, {"nexthop": "10.100.4.5", "origin": "Igp"}], + }, + { + "prefix": "10.100.0.130/31", + "vrf": "MGMT", + "paths": [{"nexthop": "10.100.0.8", "origin": "Incomplete"}, {"nexthop": "10.100.0.10", "origin": "Incomplete"}], + }, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "Prefix: 10.100.0.128/31 VRF: default Next-hop: 10.100.0.10 Origin: Incomplete - path not found", + "Prefix: 10.100.0.128/31 VRF: default Next-hop: 10.100.4.5 Origin: Igp - path not found", + "Prefix: 10.100.0.130/31 VRF: MGMT Next-hop: 10.100.0.8 Origin: Incomplete - path not found", + "Prefix: 10.100.0.130/31 VRF: MGMT Next-hop: 10.100.0.10 Origin: Incomplete - path not found", + ], + }, + }, + { + "name": "failure-prefix-not-found", + "test": VerifyBGPRoutePaths, + "eos_data": [ + {"vrfs": {"default": {"bgpRouteEntries": {}}, "MGMT": {"bgpRouteEntries": {}}}}, + ], + "inputs": { + "route_entries": [ + { + "prefix": "10.100.0.128/31", + "vrf": "default", + "paths": [{"nexthop": "10.100.0.10", "origin": "Incomplete"}, {"nexthop": "10.100.4.5", "origin": "Igp"}], + }, + { + "prefix": "10.100.0.130/31", + "vrf": "MGMT", + "paths": [{"nexthop": "10.100.0.8", "origin": "Incomplete"}, {"nexthop": "10.100.0.10", "origin": "Incomplete"}], + }, + ] + }, + "expected": { + "result": "failure", + "messages": ["Prefix: 10.100.0.128/31 VRF: default - prefix not found", "Prefix: 10.100.0.130/31 VRF: MGMT - prefix not found"], + }, + }, ] diff --git a/tests/units/anta_tests/test_profiles.py b/tests/units/anta_tests/test_profiles.py index 81ef4f9f5..63c75c5d7 100644 --- a/tests/units/anta_tests/test_profiles.py +++ b/tests/units/anta_tests/test_profiles.py @@ -23,7 +23,7 @@ "test": VerifyUnifiedForwardingTableMode, "eos_data": [{"uftMode": "2", "urpfEnabled": False, "chipModel": "bcm56870", "l2TableSize": 163840, "l3TableSize": 147456, "lpmTableSize": 32768}], "inputs": {"mode": 3}, - "expected": {"result": "failure", "messages": ["Device is not running correct UFT mode (expected: 3 / running: 2)"]}, + "expected": {"result": "failure", "messages": ["Not running the correct UFT mode - Expected: 3, Actual: 2"]}, }, { "name": "success", diff --git a/tests/units/anta_tests/test_security.py b/tests/units/anta_tests/test_security.py index 4d51c96fc..a3e710138 100644 --- a/tests/units/anta_tests/test_security.py +++ b/tests/units/anta_tests/test_security.py @@ -341,7 +341,7 @@ }, "expected": { "result": "failure", - "messages": ["SSL certificate 'ARISTA_ROOT_CA.crt', is not configured.\n"], + "messages": ["Certificate: ARISTA_ROOT_CA.crt - Not found"], }, }, { @@ -366,13 +366,6 @@ ], "inputs": { "certificates": [ - { - "certificate_name": "ARISTA_SIGNING_CA.crt", - "expiry_threshold": 30, - "common_name": "AristaIT-ICA ECDSA Issuing Cert Authority", - "encryption_algorithm": "ECDSA", - "key_size": 256, - }, { "certificate_name": "ARISTA_ROOT_CA.crt", "expiry_threshold": 30, @@ -384,7 +377,7 @@ }, "expected": { "result": "failure", - "messages": ["SSL certificate 'ARISTA_SIGNING_CA.crt', is not configured.\n", "SSL certificate `ARISTA_ROOT_CA.crt` is expired.\n"], + "messages": ["Certificate: ARISTA_ROOT_CA.crt - certificate expired"], }, }, { @@ -403,7 +396,7 @@ }, "ARISTA_SIGNING_CA.crt": { "subject": {"commonName": "AristaIT-ICA ECDSA Issuing Cert Authority"}, - "notAfter": 1702533518, + "notAfter": 1705992709, "publicKey": { "encryptionAlgorithm": "ECDSA", "size": 256, @@ -435,7 +428,9 @@ }, "expected": { "result": "failure", - "messages": ["SSL certificate `ARISTA_SIGNING_CA.crt` is expired.\n", "SSL certificate `ARISTA_ROOT_CA.crt` is about to expire in 25 days."], + "messages": [ + "Certificate: ARISTA_ROOT_CA.crt - set to expire within the threshold - Threshold: 30 days Actual: 25 days", + ], }, }, { @@ -487,12 +482,10 @@ "expected": { "result": "failure", "messages": [ - "SSL certificate `ARISTA_SIGNING_CA.crt` is not configured properly:\n" - "Expected `AristaIT-ICA ECDSA Issuing Cert Authority` as the subject.commonName, but found " - "`Arista ECDSA Issuing Cert Authority` instead.\n", - "SSL certificate `ARISTA_ROOT_CA.crt` is not configured properly:\n" - "Expected `Arista Networks Internal IT Root Cert Authority` as the subject.commonName, " - "but found `AristaIT-ICA Networks Internal IT Root Cert Authority` instead.\n", + "Certificate: ARISTA_SIGNING_CA.crt - incorrect common name - Expected: AristaIT-ICA ECDSA Issuing Cert Authority " + "Actual: Arista ECDSA Issuing Cert Authority", + "Certificate: ARISTA_ROOT_CA.crt - incorrect common name - Expected: Arista Networks Internal IT Root Cert Authority " + "Actual: AristaIT-ICA Networks Internal IT Root Cert Authority", ], }, }, @@ -545,17 +538,15 @@ "expected": { "result": "failure", "messages": [ - "SSL certificate `ARISTA_SIGNING_CA.crt` is not configured properly:\n" - "Expected `ECDSA` as the publicKey.encryptionAlgorithm, but found `RSA` instead.\n" - "Expected `256` as the publicKey.size, but found `4096` instead.\n", - "SSL certificate `ARISTA_ROOT_CA.crt` is not configured properly:\n" - "Expected `RSA` as the publicKey.encryptionAlgorithm, but found `ECDSA` instead.\n" - "Expected `4096` as the publicKey.size, but found `256` instead.\n", + "Certificate: ARISTA_SIGNING_CA.crt - incorrect encryption algorithm - Expected: ECDSA Actual: RSA", + "Certificate: ARISTA_SIGNING_CA.crt - incorrect public key - Expected: 256 Actual: 4096", + "Certificate: ARISTA_ROOT_CA.crt - incorrect encryption algorithm - Expected: RSA Actual: ECDSA", + "Certificate: ARISTA_ROOT_CA.crt - incorrect public key - Expected: 4096 Actual: 256", ], }, }, { - "name": "failure-missing-actual-output", + "name": "failure-missing-algorithm-details", "test": VerifyAPISSLCertificate, "eos_data": [ { @@ -595,12 +586,10 @@ "expected": { "result": "failure", "messages": [ - "SSL certificate `ARISTA_SIGNING_CA.crt` is not configured properly:\n" - "Expected `ECDSA` as the publicKey.encryptionAlgorithm, but it was not found in the actual output.\n" - "Expected `256` as the publicKey.size, but it was not found in the actual output.\n", - "SSL certificate `ARISTA_ROOT_CA.crt` is not configured properly:\n" - "Expected `RSA` as the publicKey.encryptionAlgorithm, but it was not found in the actual output.\n" - "Expected `4096` as the publicKey.size, but it was not found in the actual output.\n", + "Certificate: ARISTA_SIGNING_CA.crt - incorrect encryption algorithm - Expected: ECDSA Actual: Not found", + "Certificate: ARISTA_SIGNING_CA.crt - incorrect public key - Expected: 256 Actual: Not found", + "Certificate: ARISTA_ROOT_CA.crt - incorrect encryption algorithm - Expected: RSA Actual: Not found", + "Certificate: ARISTA_ROOT_CA.crt - incorrect public key - Expected: 4096 Actual: Not found", ], }, }, @@ -717,22 +706,20 @@ { "aclList": [ { + "name": "default-control-plane-acl", "sequence": [ {"text": "permit icmp any any", "sequenceNumber": 10}, {"text": "permit ip any any tracked", "sequenceNumber": 20}, {"text": "permit udp any any eq bfd ttl eq 255", "sequenceNumber": 30}, ], - } - ] - }, - { - "aclList": [ + }, { + "name": "LabTest", "sequence": [ {"text": "permit icmp any any", "sequenceNumber": 10}, {"text": "permit tcp any any range 5900 5910", "sequenceNumber": 20}, ], - } + }, ] }, ], @@ -754,6 +741,24 @@ }, "expected": {"result": "success"}, }, + { + "name": "failure-no-acl-list", + "test": VerifyIPv4ACL, + "eos_data": [ + {"aclList": []}, + ], + "inputs": { + "ipv4_access_lists": [ + { + "name": "default-control-plane-acl", + "entries": [ + {"sequence": 10, "action": "permit icmp any any"}, + ], + }, + ] + }, + "expected": {"result": "failure", "messages": ["No Access Control List (ACL) configured"]}, + }, { "name": "failure-acl-not-found", "test": VerifyIPv4ACL, @@ -761,6 +766,7 @@ { "aclList": [ { + "name": "default-control-plane-acl", "sequence": [ {"text": "permit icmp any any", "sequenceNumber": 10}, {"text": "permit ip any any tracked", "sequenceNumber": 20}, @@ -769,7 +775,6 @@ } ] }, - {"aclList": []}, ], "inputs": { "ipv4_access_lists": [ @@ -787,7 +792,7 @@ }, ] }, - "expected": {"result": "failure", "messages": ["LabTest: Not found"]}, + "expected": {"result": "failure", "messages": ["ACL name: LabTest - Not configured"]}, }, { "name": "failure-sequence-not-found", @@ -796,22 +801,20 @@ { "aclList": [ { + "name": "default-control-plane-acl", "sequence": [ {"text": "permit icmp any any", "sequenceNumber": 10}, {"text": "permit ip any any tracked", "sequenceNumber": 20}, {"text": "permit udp any any eq bfd ttl eq 255", "sequenceNumber": 40}, ], - } - ] - }, - { - "aclList": [ + }, { + "name": "LabTest", "sequence": [ {"text": "permit icmp any any", "sequenceNumber": 10}, {"text": "permit tcp any any range 5900 5910", "sequenceNumber": 30}, ], - } + }, ] }, ], @@ -833,7 +836,7 @@ }, "expected": { "result": "failure", - "messages": ["default-control-plane-acl:\nSequence number `30` is not found.\n", "LabTest:\nSequence number `20` is not found.\n"], + "messages": ["ACL name: default-control-plane-acl Sequence: 30 - Not configured", "ACL name: LabTest Sequence: 20 - Not configured"], }, }, { @@ -843,22 +846,20 @@ { "aclList": [ { + "name": "default-control-plane-acl", "sequence": [ {"text": "permit icmp any any", "sequenceNumber": 10}, {"text": "permit ip any any tracked", "sequenceNumber": 20}, {"text": "permit tcp any any range 5900 5910", "sequenceNumber": 30}, ], - } - ] - }, - { - "aclList": [ + }, { + "name": "LabTest", "sequence": [ {"text": "permit icmp any any", "sequenceNumber": 10}, {"text": "permit udp any any eq bfd ttl eq 255", "sequenceNumber": 20}, ], - } + }, ] }, ], @@ -881,9 +882,9 @@ "expected": { "result": "failure", "messages": [ - "default-control-plane-acl:\n" - "Expected `permit udp any any eq bfd ttl eq 255` as sequence number 30 action but found `permit tcp any any range 5900 5910` instead.\n", - "LabTest:\nExpected `permit tcp any any range 5900 5910` as sequence number 20 action but found `permit udp any any eq bfd ttl eq 255` instead.\n", + "ACL name: default-control-plane-acl Sequence: 30 - action mismatch - Expected: permit udp any any eq bfd ttl eq 255 " + "Actual: permit tcp any any range 5900 5910", + "ACL name: LabTest Sequence: 20 - action mismatch - Expected: permit tcp any any range 5900 5910 Actual: permit udp any any eq bfd ttl eq 255", ], }, }, @@ -894,6 +895,7 @@ { "aclList": [ { + "name": "default-control-plane-acl", "sequence": [ {"text": "permit icmp any any", "sequenceNumber": 10}, {"text": "permit ip any any tracked", "sequenceNumber": 40}, @@ -902,7 +904,6 @@ } ] }, - {"aclList": []}, ], "inputs": { "ipv4_access_lists": [ @@ -923,9 +924,10 @@ "expected": { "result": "failure", "messages": [ - "default-control-plane-acl:\nSequence number `20` is not found.\n" - "Expected `permit udp any any eq bfd ttl eq 255` as sequence number 30 action but found `permit tcp any any range 5900 5910` instead.\n", - "LabTest: Not found", + "ACL name: default-control-plane-acl Sequence: 20 - Not configured", + "ACL name: default-control-plane-acl Sequence: 30 - action mismatch - Expected: permit udp any any eq bfd ttl eq 255 " + "Actual: permit tcp any any range 5900 5910", + "ACL name: LabTest - Not configured", ], }, }, diff --git a/tests/units/input_models/routing/test_bgp.py b/tests/units/input_models/routing/test_bgp.py index 61f2b4397..7ff047ce4 100644 --- a/tests/units/input_models/routing/test_bgp.py +++ b/tests/units/input_models/routing/test_bgp.py @@ -14,6 +14,7 @@ from anta.input_models.routing.bgp import BgpAddressFamily, BgpPeer from anta.tests.routing.bgp import ( VerifyBGPExchangedRoutes, + VerifyBGPNlriAcceptance, VerifyBGPPeerCount, VerifyBGPPeerGroup, VerifyBGPPeerMPCaps, @@ -262,3 +263,28 @@ def test_invalid(self, bgp_peers: list[BgpPeer]) -> None: """Test VerifyBGPPeerGroup.Input invalid inputs.""" with pytest.raises(ValidationError): VerifyBGPPeerGroup.Input(bgp_peers=bgp_peers) + + +class TestVerifyBGPNlriAcceptanceInput: + """Test anta.tests.routing.bgp.VerifyBGPNlriAcceptance.Input.""" + + @pytest.mark.parametrize( + ("bgp_peers"), + [ + pytest.param([{"peer_address": "172.30.255.5", "vrf": "default", "capabilities": ["ipv4Unicast"]}], id="valid"), + ], + ) + def test_valid(self, bgp_peers: list[BgpPeer]) -> None: + """Test VerifyBGPNlriAcceptance.Input valid inputs.""" + VerifyBGPNlriAcceptance.Input(bgp_peers=bgp_peers) + + @pytest.mark.parametrize( + ("bgp_peers"), + [ + pytest.param([{"peer_address": "172.30.255.5", "vrf": "default"}], id="invalid"), + ], + ) + def test_invalid(self, bgp_peers: list[BgpPeer]) -> None: + """Test VerifyBGPNlriAcceptance.Input invalid inputs.""" + with pytest.raises(ValidationError): + VerifyBGPNlriAcceptance.Input(bgp_peers=bgp_peers) diff --git a/tests/units/test_device.py b/tests/units/test_device.py index d7b25d4eb..e65eeb2a3 100644 --- a/tests/units/test_device.py +++ b/tests/units/test_device.py @@ -589,11 +589,11 @@ async def test_collect(self, device: AntaDevice, command: dict[str, Any], expect if expected["cache_hit"] is True: assert cmd.output == cached_output assert current_cached_data == cached_output - assert device.cache.hit_miss_ratio["hits"] == 2 + assert device.cache.stats["hits"] == 2 else: assert cmd.output == COMMAND_OUTPUT assert current_cached_data == COMMAND_OUTPUT - assert device.cache.hit_miss_ratio["hits"] == 1 + assert device.cache.stats["hits"] == 1 else: # command is not allowed to use cache device._collect.assert_called_once_with(command=cmd, collection_id=None) # type: ignore[attr-defined] assert cmd.output == COMMAND_OUTPUT