Skip to content

Commit

Permalink
Resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
carl-baillargeon committed Jan 14, 2025
2 parents 5e09b6f + 56acfbc commit 2d40e18
Show file tree
Hide file tree
Showing 23 changed files with 1,354 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ repos:
- '<!--| ~| -->'

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.6
rev: v0.9.1
hooks:
- id: ruff
name: Run Ruff linter
Expand Down
8 changes: 3 additions & 5 deletions anta/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def flatten_modules(data: dict[str, Any], package: str | None = None) -> dict[Mo
except Exception as e:
# A test module is potentially user-defined code.
# We need to catch everything if we want to have meaningful logs
module_str = f"{module_name[1:] if module_name.startswith('.') else module_name}{f' from package {package}' if package else ''}"
module_str = f"{module_name.removeprefix('.')}{f' from package {package}' if package else ''}"
message = f"Module named {module_str} cannot be imported. Verify that the module exists and there is no Python syntax issues."
anta_log_exception(e, message, logger)
raise ValueError(message) from e
Expand Down Expand Up @@ -223,16 +223,14 @@ def check_tests(cls: type[AntaCatalogFile], data: Any) -> Any: # noqa: ANN401
raise ValueError(msg) # noqa: TRY004 pydantic catches ValueError or AssertionError, no TypeError
if len(test_definition) != 1:
msg = (
f"Syntax error when parsing: {test_definition}\n"
"It must be a dictionary with a single entry. Check the indentation in the test catalog."
f"Syntax error when parsing: {test_definition}\nIt must be a dictionary with a single entry. Check the indentation in the test catalog."
)
raise ValueError(msg)
for test_name, test_inputs in test_definition.copy().items():
test: type[AntaTest] | None = getattr(module, test_name, None)
if test is None:
msg = (
f"{test_name} is not defined in Python module {module.__name__}"
f"{f' (from {module.__file__})' if module.__file__ is not None else ''}"
f"{test_name} is not defined in Python module {module.__name__}{f' (from {module.__file__})' if module.__file__ is not None else ''}"
)
raise ValueError(msg)
test_definitions.append(AntaTestDefinition(test=test, inputs=test_inputs))
Expand Down
5 changes: 4 additions & 1 deletion anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ def validate_regex(value: str) -> str:
SnmpErrorCounter = Literal[
"inVersionErrs", "inBadCommunityNames", "inBadCommunityUses", "inParseErrs", "outTooBigErrs", "outNoSuchNameErrs", "outBadValueErrs", "outGeneralErrs"
]

IPv4RouteType = Literal[
"connected",
"static",
Expand Down Expand Up @@ -238,3 +237,7 @@ def validate_regex(value: str) -> str:
"Route Cache Route",
"CBF Leaked Route",
]
SnmpVersion = Literal["v1", "v2c", "v3"]
SnmpHashingAlgorithm = Literal["MD5", "SHA", "SHA-224", "SHA-256", "SHA-384", "SHA-512"]
SnmpEncryptionAlgorithm = Literal["AES-128", "AES-192", "AES-256", "DES"]
DynamicVlanSource = Literal["dmf", "dot1x", "dynvtep", "evpn", "mlag", "mlagsync", "mvpn", "swfwd", "vccbfd"]
56 changes: 56 additions & 0 deletions anta/input_models/snmp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright (c) 2023-2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Module containing input models for SNMP tests."""

from __future__ import annotations

from ipaddress import IPv4Address

from pydantic import BaseModel, ConfigDict

from anta.custom_types import Hostname, SnmpEncryptionAlgorithm, SnmpHashingAlgorithm, SnmpVersion


class SnmpHost(BaseModel):
"""Model for a SNMP host."""

model_config = ConfigDict(extra="forbid")
hostname: IPv4Address | Hostname
"""IPv4 address or hostname of the SNMP notification host."""
vrf: str = "default"
"""Optional VRF for SNMP hosts. If not provided, it defaults to `default`."""

def __str__(self) -> str:
"""Return a human-readable string representation of the SnmpHost for reporting.
Examples
--------
- Host: 192.168.1.100 VRF: default
"""
return f"Host: {self.hostname} VRF: {self.vrf}"


class SnmpUser(BaseModel):
"""Model for a SNMP User."""

model_config = ConfigDict(extra="forbid")
username: str
"""SNMP user name."""
group_name: str
"""SNMP group for the user."""
version: SnmpVersion
"""SNMP protocol version."""
auth_type: SnmpHashingAlgorithm | None = None
"""User authentication algorithm. Can be provided in the `VerifySnmpUser` test."""
priv_type: SnmpEncryptionAlgorithm | None = None
"""User privacy algorithm. Can be provided in the `VerifySnmpUser` test."""

def __str__(self) -> str:
"""Return a human-readable string representation of the SnmpUser for reporting.
Examples
--------
- User: Test Group: Test_Group Version: v2c
"""
return f"User: {self.username} Group: {self.group_name} Version: {self.version}"
5 changes: 2 additions & 3 deletions anta/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devic

# If there are no devices in the inventory after filtering, exit
if not selected_inventory.devices:
msg = f'No reachable device {f"matching the tags {tags} " if tags else ""}was found.{f" Selected devices: {devices} " if devices is not None else ""}'
msg = f"No reachable device {f'matching the tags {tags} ' if tags else ''}was found.{f' Selected devices: {devices} ' if devices is not None else ''}"
logger.warning(msg)
return None

Expand Down Expand Up @@ -170,8 +170,7 @@ def prepare_tests(

if total_test_count == 0:
msg = (
f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current "
"test catalog and device inventory, please verify your inputs."
f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs."
)
logger.warning(msg)
return None
Expand Down
34 changes: 32 additions & 2 deletions anta/tests/bfd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING, ClassVar
from typing import TYPE_CHECKING, ClassVar, TypeVar

from pydantic import Field
from pydantic import Field, field_validator

from anta.input_models.bfd import BFDPeer
from anta.models import AntaCommand, AntaTest
Expand All @@ -19,6 +19,9 @@
if TYPE_CHECKING:
from anta.models import AntaTemplate

# Using a TypeVar for the BFDPeer model since mypy thinks it's a ClassVar and not a valid type when used in field validators
T = TypeVar("T", bound=BFDPeer)


class VerifyBFDSpecificPeers(AntaTest):
"""Verifies the state of IPv4 BFD peer sessions.
Expand Down Expand Up @@ -143,6 +146,23 @@ class Input(AntaTest.Input):
BFDPeer: ClassVar[type[BFDPeer]] = BFDPeer
"""To maintain backward compatibility"""

@field_validator("bfd_peers")
@classmethod
def validate_bfd_peers(cls, bfd_peers: list[T]) -> list[T]:
"""Validate that 'tx_interval', 'rx_interval' and 'multiplier' fields are provided in each BFD peer."""
for peer in bfd_peers:
missing_fileds = []
if peer.tx_interval is None:
missing_fileds.append("tx_interval")
if peer.rx_interval is None:
missing_fileds.append("rx_interval")
if peer.multiplier is None:
missing_fileds.append("multiplier")
if missing_fileds:
msg = f"{peer} {', '.join(missing_fileds)} field(s) are missing in the input."
raise ValueError(msg)
return bfd_peers

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyBFDPeersIntervals."""
Expand Down Expand Up @@ -308,6 +328,16 @@ class Input(AntaTest.Input):
BFDPeer: ClassVar[type[BFDPeer]] = BFDPeer
"""To maintain backward compatibility"""

@field_validator("bfd_peers")
@classmethod
def validate_bfd_peers(cls, bfd_peers: list[T]) -> list[T]:
"""Validate that 'protocols' field is provided in each BFD peer."""
for peer in bfd_peers:
if peer.protocols is None:
msg = f"{peer} 'protocols' field missing in the input."
raise ValueError(msg)
return bfd_peers

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyBFDPeersRegProtocols."""
Expand Down
29 changes: 29 additions & 0 deletions anta/tests/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,35 @@ def _get_logging_states(logger: logging.Logger, command_output: str) -> str:
return log_states


class VerifySyslogLogging(AntaTest):
"""Verifies if syslog logging is enabled.
Expected Results
----------------
* Success: The test will pass if syslog logging is enabled.
* Failure: The test will fail if syslog logging is disabled.
Examples
--------
```yaml
anta.tests.logging:
- VerifySyslogLogging:
```
"""

categories: ClassVar[list[str]] = ["logging"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show logging", ofmt="text")]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifySyslogLogging."""
self.result.is_success()
log_output = self.instance_commands[0].text_output

if "Syslog logging: enabled" not in _get_logging_states(self.logger, log_output):
self.result.is_failure("Syslog logging is disabled.")


class VerifyLoggingPersistent(AntaTest):
"""Verifies if logging persistent is enabled and logs are saved in flash.
Expand Down
142 changes: 142 additions & 0 deletions anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,3 +1403,145 @@ def test(self) -> None:

if (actual_peer_group := peer_data.get("peerGroupName", "Not Found")) != peer.peer_group:
self.result.is_failure(f"{peer} - Incorrect peer group configured - Expected: {peer.peer_group} Actual: {actual_peer_group}")


class VerifyBGPPeerSessionRibd(AntaTest):
"""Verifies the session state of BGP IPv4 peer(s).
Compatible with EOS operating in `ribd` routing protocol model.
This test performs the following checks for each specified peer:
1. Verifies that the peer is found in its VRF in the BGP configuration.
2. Checks that the BGP session is in the `Established` state.
3. Ensures that both input and output TCP message queues are empty.
Can be disabled by setting `check_tcp_queues` global flag to `False`.
Expected Results
----------------
* Success: If all of the following conditions are met:
- All specified peers are found in the BGP configuration.
- All peers sessions state are `Established`.
- All peers have empty TCP message queues if `check_tcp_queues` is `True` (default).
* Failure: If any of the following occur:
- A specified peer is not found in the BGP configuration.
- A peer's session state is not `Established`.
- A peer has non-empty TCP message queues (input or output) when `check_tcp_queues` is `True`.
Examples
--------
```yaml
anta.tests.routing:
bgp:
- VerifyBGPPeerSessionRibd:
check_tcp_queues: false
bgp_peers:
- peer_address: 10.1.0.1
vrf: default
- peer_address: 10.1.0.2
vrf: default
- peer_address: 10.1.255.2
vrf: DEV
- peer_address: 10.1.255.4
vrf: DEV
```
"""

categories: ClassVar[list[str]] = ["bgp"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ip bgp neighbors vrf all", revision=2)]

class Input(AntaTest.Input):
"""Input model for the VerifyBGPPeerSessionRibd test."""

check_tcp_queues: bool = True
"""Flag to check if the TCP session queues are empty for all BGP peers. Defaults to `True`."""
bgp_peers: list[BgpPeer]
"""List of BGP IPv4 peers."""

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyBGPPeerSessionRibd."""
self.result.is_success()

output = self.instance_commands[0].json_output

for peer in self.inputs.bgp_peers:
peer_address = str(peer.peer_address)
peers = get_value(output, f"vrfs.{peer.vrf}.peerList", default=[])

# Check if the peer is found
if (peer_data := get_item(peers, "peerAddress", peer_address)) is None:
self.result.is_failure(f"{peer} - Not found")
continue

# Check if the BGP session is established
if peer_data["state"] != "Established":
self.result.is_failure(f"{peer} - Session state is not established - State: {peer_data['state']}")
continue

# Check the TCP session message queues
if self.inputs.check_tcp_queues:
inq_stat = peer_data["peerTcpInfo"]["inputQueueLength"]
outq_stat = peer_data["peerTcpInfo"]["outputQueueLength"]
if inq_stat != 0 or outq_stat != 0:
self.result.is_failure(f"{peer} - Session has non-empty message queues - InQ: {inq_stat}, OutQ: {outq_stat}")


class VerifyBGPPeersHealthRibd(AntaTest):
"""Verifies the health of all the BGP IPv4 peer(s).
Compatible with EOS operating in `ribd` routing protocol model.
This test performs the following checks for all BGP IPv4 peers:
1. Verifies that the BGP session is in the `Established` state.
2. Checks that both input and output TCP message queues are empty.
Can be disabled by setting `check_tcp_queues` global flag to `False`.
Expected Results
----------------
* Success: If all checks pass for all BGP IPv4 peers.
* Failure: If any of the following occur:
- Any BGP session is not in the `Established` state.
- Any TCP message queue (input or output) is not empty when `check_tcp_queues` is `True` (default).
Examples
--------
```yaml
anta.tests.routing:
bgp:
- VerifyBGPPeersHealthRibd:
check_tcp_queues: True
```
"""

categories: ClassVar[list[str]] = ["bgp"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ip bgp neighbors vrf all", revision=2)]

class Input(AntaTest.Input):
"""Input model for the VerifyBGPPeersHealthRibd test."""

check_tcp_queues: bool = True
"""Flag to check if the TCP session queues are empty for all BGP peers. Defaults to `True`."""

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyBGPPeersHealthRibd."""
self.result.is_success()

output = self.instance_commands[0].json_output

for vrf, vrf_data in output["vrfs"].items():
peer_list = vrf_data.get("peerList", [])

for peer in peer_list:
# Check if the BGP session is established
if peer["state"] != "Established":
self.result.is_failure(f"Peer: {peer['peerAddress']} VRF: {vrf} - Session state is not established - State: {peer['state']}")
continue

# Check the TCP session message queues
inq = peer["peerTcpInfo"]["inputQueueLength"]
outq = peer["peerTcpInfo"]["outputQueueLength"]
if self.inputs.check_tcp_queues and (inq != 0 or outq != 0):
self.result.is_failure(f"Peer: {peer['peerAddress']} VRF: {vrf} - Session has non-empty message queues - InQ: {inq}, OutQ: {outq}")
Loading

0 comments on commit 2d40e18

Please sign in to comment.