Skip to content

Commit

Permalink
Resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
carl-baillargeon committed Jan 14, 2025
2 parents fcdc00b + 8e5de9a commit 71004b2
Show file tree
Hide file tree
Showing 24 changed files with 686 additions and 34 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ repos:
- '<!--| ~| -->'

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.4
rev: v0.9.1
hooks:
- id: ruff
name: Run Ruff linter
Expand Down Expand Up @@ -85,7 +85,7 @@ repos:
types: [text]

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.14.0
rev: v1.14.1
hooks:
- id: mypy
name: Check typing with mypy
Expand Down
8 changes: 3 additions & 5 deletions anta/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def flatten_modules(data: dict[str, Any], package: str | None = None) -> dict[Mo
except Exception as e:
# A test module is potentially user-defined code.
# We need to catch everything if we want to have meaningful logs
module_str = f"{module_name[1:] if module_name.startswith('.') else module_name}{f' from package {package}' if package else ''}"
module_str = f"{module_name.removeprefix('.')}{f' from package {package}' if package else ''}"
message = f"Module named {module_str} cannot be imported. Verify that the module exists and there is no Python syntax issues."
anta_log_exception(e, message, logger)
raise ValueError(message) from e
Expand Down Expand Up @@ -223,16 +223,14 @@ def check_tests(cls: type[AntaCatalogFile], data: Any) -> Any: # noqa: ANN401
raise ValueError(msg) # noqa: TRY004 pydantic catches ValueError or AssertionError, no TypeError
if len(test_definition) != 1:
msg = (
f"Syntax error when parsing: {test_definition}\n"
"It must be a dictionary with a single entry. Check the indentation in the test catalog."
f"Syntax error when parsing: {test_definition}\nIt must be a dictionary with a single entry. Check the indentation in the test catalog."
)
raise ValueError(msg)
for test_name, test_inputs in test_definition.copy().items():
test: type[AntaTest] | None = getattr(module, test_name, None)
if test is None:
msg = (
f"{test_name} is not defined in Python module {module.__name__}"
f"{f' (from {module.__file__})' if module.__file__ is not None else ''}"
f"{test_name} is not defined in Python module {module.__name__}{f' (from {module.__file__})' if module.__file__ is not None else ''}"
)
raise ValueError(msg)
test_definitions.append(AntaTestDefinition(test=test, inputs=test_inputs))
Expand Down
7 changes: 5 additions & 2 deletions anta/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from anta.catalog import AntaCatalog
from anta.inventory import AntaInventory
from anta.inventory.exceptions import InventoryIncorrectSchemaError, InventoryRootKeyError
from anta.logger import anta_log_exception

if TYPE_CHECKING:
from click import Option
Expand Down Expand Up @@ -242,7 +243,8 @@ def wrapper(
insecure=insecure,
disable_cache=disable_cache,
)
except (TypeError, ValueError, YAMLError, OSError, InventoryIncorrectSchemaError, InventoryRootKeyError):
except (TypeError, ValueError, YAMLError, OSError, InventoryIncorrectSchemaError, InventoryRootKeyError) as e:
anta_log_exception(e, f"Failed to parse the inventory: {inventory}", logger)
ctx.exit(ExitCode.USAGE_ERROR)
return f(*args, inventory=i, **kwargs)

Expand Down Expand Up @@ -319,7 +321,8 @@ def wrapper(
try:
file_format = catalog_format.lower()
c = AntaCatalog.parse(catalog, file_format=file_format) # type: ignore[arg-type]
except (TypeError, ValueError, YAMLError, OSError):
except (TypeError, ValueError, YAMLError, OSError) as e:
anta_log_exception(e, f"Failed to parse the catalog: {catalog}", logger)
ctx.exit(ExitCode.USAGE_ERROR)
return f(*args, catalog=c, **kwargs)

Expand Down
4 changes: 3 additions & 1 deletion anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ def validate_regex(value: str) -> str:
SnmpErrorCounter = Literal[
"inVersionErrs", "inBadCommunityNames", "inBadCommunityUses", "inParseErrs", "outTooBigErrs", "outNoSuchNameErrs", "outBadValueErrs", "outGeneralErrs"
]

IPv4RouteType = Literal[
"connected",
"static",
Expand Down Expand Up @@ -238,4 +237,7 @@ def validate_regex(value: str) -> str:
"Route Cache Route",
"CBF Leaked Route",
]
SnmpVersion = Literal["v1", "v2c", "v3"]
SnmpHashingAlgorithm = Literal["MD5", "SHA", "SHA-224", "SHA-256", "SHA-384", "SHA-512"]
SnmpEncryptionAlgorithm = Literal["AES-128", "AES-192", "AES-256", "DES"]
DynamicVlanSource = Literal["dmf", "dot1x", "dynvtep", "evpn", "mlag", "mlagsync", "mvpn", "swfwd", "vccbfd"]
4 changes: 4 additions & 0 deletions anta/input_models/bfd.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class BFDPeer(BaseModel):
"""Multiplier of BFD peer. Required field in the `VerifyBFDPeersIntervals` test."""
protocols: list[BfdProtocol] | None = None
"""List of protocols to be verified. Required field in the `VerifyBFDPeersRegProtocols` test."""
detection_time: int | None = None
"""Detection time of BFD peer in milliseconds. Defines how long to wait without receiving BFD packets before declaring the peer session as down.
Optional field in the `VerifyBFDPeersIntervals` test."""

def __str__(self) -> str:
"""Return a human-readable string representation of the BFDPeer for reporting."""
Expand Down
35 changes: 35 additions & 0 deletions anta/input_models/snmp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) 2023-2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Module containing input models for SNMP tests."""

from __future__ import annotations

from pydantic import BaseModel, ConfigDict

from anta.custom_types import SnmpEncryptionAlgorithm, SnmpHashingAlgorithm, SnmpVersion


class SnmpUser(BaseModel):
"""Model for a SNMP User."""

model_config = ConfigDict(extra="forbid")
username: str
"""SNMP user name."""
group_name: str
"""SNMP group for the user."""
version: SnmpVersion
"""SNMP protocol version."""
auth_type: SnmpHashingAlgorithm | None = None
"""User authentication algorithm. Can be provided in the `VerifySnmpUser` test."""
priv_type: SnmpEncryptionAlgorithm | None = None
"""User privacy algorithm. Can be provided in the `VerifySnmpUser` test."""

def __str__(self) -> str:
"""Return a human-readable string representation of the SnmpUser for reporting.
Examples
--------
- User: Test Group: Test_Group Version: v2c
"""
return f"User: {self.username} Group: {self.group_name} Version: {self.version}"
58 changes: 44 additions & 14 deletions anta/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@
import traceback
from datetime import timedelta
from enum import Enum
from typing import TYPE_CHECKING, Literal
from pathlib import Path
from typing import Literal

from rich.logging import RichHandler

from anta import __DEBUG__

if TYPE_CHECKING:
from pathlib import Path

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -69,27 +67,59 @@ def setup_logging(level: LogLevel = Log.INFO, file: Path | None = None) -> None:
# httpx as well
logging.getLogger("httpx").setLevel(logging.WARNING)

# Add RichHandler for stdout
rich_handler = RichHandler(markup=True, rich_tracebacks=True, tracebacks_show_locals=False)
# Show Python module in stdout at DEBUG level
fmt_string = "[grey58]\\[%(name)s][/grey58] %(message)s" if loglevel == logging.DEBUG else "%(message)s"
formatter = logging.Formatter(fmt=fmt_string, datefmt="[%X]")
rich_handler.setFormatter(formatter)
root.addHandler(rich_handler)
# Add FileHandler if file is provided
if file:
# Add RichHandler for stdout if not already present
_maybe_add_rich_handler(loglevel, root)

# Add FileHandler if file is provided and same File Handler is not already present
if file and not _get_file_handler(root, file):
file_handler = logging.FileHandler(file)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
root.addHandler(file_handler)
# If level is DEBUG and file is provided, do not send DEBUG level to stdout
if loglevel == logging.DEBUG:
if loglevel == logging.DEBUG and (rich_handler := _get_rich_handler(root)) is not None:
rich_handler.setLevel(logging.INFO)

if __DEBUG__:
logger.debug("ANTA Debug Mode enabled")


def _get_file_handler(logger_instance: logging.Logger, file: Path) -> logging.FileHandler | None:
"""Return the FileHandler if present."""
return (
next(
(
handler
for handler in logger_instance.handlers
if isinstance(handler, logging.FileHandler) and str(Path(handler.baseFilename).resolve()) == str(file.resolve())
),
None,
)
if logger_instance.hasHandlers()
else None
)


def _get_rich_handler(logger_instance: logging.Logger) -> logging.Handler | None:
"""Return the ANTA Rich Handler."""
return next((handler for handler in logger_instance.handlers if handler.get_name() == "ANTA_RICH_HANDLER"), None) if logger_instance.hasHandlers() else None


def _maybe_add_rich_handler(loglevel: int, logger_instance: logging.Logger) -> None:
"""Add RichHandler for stdout if not already present."""
if _get_rich_handler(logger_instance) is not None:
# Nothing to do.
return

anta_rich_handler = RichHandler(markup=True, rich_tracebacks=True, tracebacks_show_locals=False)
anta_rich_handler.set_name("ANTA_RICH_HANDLER")
# Show Python module in stdout at DEBUG level
fmt_string = "[grey58]\\[%(name)s][/grey58] %(message)s" if loglevel == logging.DEBUG else "%(message)s"
formatter = logging.Formatter(fmt=fmt_string, datefmt="[%X]")
anta_rich_handler.setFormatter(formatter)
logger_instance.addHandler(anta_rich_handler)


def format_td(seconds: float, digits: int = 3) -> str:
"""Return a formatted string from a float number representing seconds and a number of digits."""
isec, fsec = divmod(round(seconds * 10**digits), 10**digits)
Expand Down
5 changes: 2 additions & 3 deletions anta/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devic

# If there are no devices in the inventory after filtering, exit
if not selected_inventory.devices:
msg = f'No reachable device {f"matching the tags {tags} " if tags else ""}was found.{f" Selected devices: {devices} " if devices is not None else ""}'
msg = f"No reachable device {f'matching the tags {tags} ' if tags else ''}was found.{f' Selected devices: {devices} ' if devices is not None else ''}"
logger.warning(msg)
return None

Expand Down Expand Up @@ -170,8 +170,7 @@ def prepare_tests(

if total_test_count == 0:
msg = (
f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current "
"test catalog and device inventory, please verify your inputs."
f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs."
)
logger.warning(msg)
return None
Expand Down
9 changes: 9 additions & 0 deletions anta/tests/bfd.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,18 @@ class VerifyBFDPeersIntervals(AntaTest):
1. Confirms that the specified VRF is configured.
2. Verifies that the peer exists in the BFD configuration.
3. Confirms that BFD peer is correctly configured with the `Transmit interval, Receive interval and Multiplier`.
4. Verifies that BFD peer is correctly configured with the `Detection time`, if provided.
Expected Results
----------------
* Success: If all of the following conditions are met:
- All specified peers are found in the BFD configuration within the specified VRF.
- All BFD peers are correctly configured with the `Transmit interval, Receive interval and Multiplier`.
- If provided, the `Detection time` is correctly configured.
* Failure: If any of the following occur:
- A specified peer is not found in the BFD configuration within the specified VRF.
- Any BFD peer not correctly configured with the `Transmit interval, Receive interval and Multiplier`.
- Any BFD peer is not correctly configured with `Detection time`, if provided.
Examples
--------
Expand All @@ -125,6 +128,7 @@ class VerifyBFDPeersIntervals(AntaTest):
tx_interval: 1200
rx_interval: 1200
multiplier: 3
detection_time: 3600
```
"""

Expand All @@ -151,6 +155,7 @@ def test(self) -> None:
tx_interval = bfd_peer.tx_interval
rx_interval = bfd_peer.rx_interval
multiplier = bfd_peer.multiplier
detect_time = bfd_peer.detection_time

# Check if BFD peer configured
bfd_output = get_value(
Expand All @@ -166,6 +171,7 @@ def test(self) -> None:
bfd_details = bfd_output.get("peerStatsDetail", {})
op_tx_interval = bfd_details.get("operTxInterval") // 1000
op_rx_interval = bfd_details.get("operRxInterval") // 1000
op_detection_time = bfd_details.get("detectTime") // 1000
detect_multiplier = bfd_details.get("detectMult")

if op_tx_interval != tx_interval:
Expand All @@ -177,6 +183,9 @@ def test(self) -> None:
if detect_multiplier != multiplier:
self.result.is_failure(f"{bfd_peer} - Incorrect Multiplier - Expected: {multiplier} Actual: {detect_multiplier}")

if detect_time and op_detection_time != detect_time:
self.result.is_failure(f"{bfd_peer} - Incorrect Detection Time - Expected: {detect_time} Actual: {op_detection_time}")


class VerifyBFDPeersHealth(AntaTest):
"""Verifies the health of IPv4 BFD peers across all VRFs.
Expand Down
29 changes: 29 additions & 0 deletions anta/tests/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,35 @@ def _get_logging_states(logger: logging.Logger, command_output: str) -> str:
return log_states


class VerifySyslogLogging(AntaTest):
"""Verifies if syslog logging is enabled.
Expected Results
----------------
* Success: The test will pass if syslog logging is enabled.
* Failure: The test will fail if syslog logging is disabled.
Examples
--------
```yaml
anta.tests.logging:
- VerifySyslogLogging:
```
"""

categories: ClassVar[list[str]] = ["logging"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show logging", ofmt="text")]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifySyslogLogging."""
self.result.is_success()
log_output = self.instance_commands[0].text_output

if "Syslog logging: enabled" not in _get_logging_states(self.logger, log_output):
self.result.is_failure("Syslog logging is disabled.")


class VerifyLoggingPersistent(AntaTest):
"""Verifies if logging persistent is enabled and logs are saved in flash.
Expand Down
Loading

0 comments on commit 71004b2

Please sign in to comment.