From 44d34f37240c3e9a297557098bfcbf4eb491e09a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 10 Jan 2025 09:39:19 +0100 Subject: [PATCH 1/8] chore: update ruff requirement from <0.9.0,>=0.5.4 to >=0.5.4,<0.10.0 (#1002) * chore: update ruff requirement from <0.9.0,>=0.5.4 to >=0.5.4,<0.10.0 Updates the requirements on [ruff](https://github.com/astral-sh/ruff) to permit the latest version. - [Release notes](https://github.com/astral-sh/ruff/releases) - [Changelog](https://github.com/astral-sh/ruff/blob/main/CHANGELOG.md) - [Commits](https://github.com/astral-sh/ruff/compare/0.5.4...0.9.0) --- updated-dependencies: - dependency-name: ruff dependency-type: direct:production ... Signed-off-by: dependabot[bot] * Update pre-commit as well * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * ci: Ignore shadowing logging module with anta.tests.logging --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Guillaume Mulocher Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .pre-commit-config.yaml | 2 +- anta/catalog.py | 8 +++----- anta/runner.py | 5 ++--- anta/tests/system.py | 2 +- pyproject.toml | 5 ++++- tests/benchmark/test_anta.py | 2 +- tests/units/test_runner.py | 2 +- 7 files changed, 13 insertions(+), 13 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f12dda006..1961ec0c4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -46,7 +46,7 @@ repos: - '' - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.8.6 + rev: v0.9.0 hooks: - id: ruff name: Run Ruff linter diff --git a/anta/catalog.py b/anta/catalog.py index 65031cedc..ca0eeb176 100644 --- a/anta/catalog.py +++ b/anta/catalog.py @@ -182,7 +182,7 @@ def flatten_modules(data: dict[str, Any], package: str | None = None) -> dict[Mo except Exception as e: # A test module is potentially user-defined code. # We need to catch everything if we want to have meaningful logs - module_str = f"{module_name[1:] if module_name.startswith('.') else module_name}{f' from package {package}' if package else ''}" + module_str = f"{module_name.removeprefix('.')}{f' from package {package}' if package else ''}" message = f"Module named {module_str} cannot be imported. Verify that the module exists and there is no Python syntax issues." anta_log_exception(e, message, logger) raise ValueError(message) from e @@ -223,16 +223,14 @@ def check_tests(cls: type[AntaCatalogFile], data: Any) -> Any: # noqa: ANN401 raise ValueError(msg) # noqa: TRY004 pydantic catches ValueError or AssertionError, no TypeError if len(test_definition) != 1: msg = ( - f"Syntax error when parsing: {test_definition}\n" - "It must be a dictionary with a single entry. Check the indentation in the test catalog." + f"Syntax error when parsing: {test_definition}\nIt must be a dictionary with a single entry. Check the indentation in the test catalog." ) raise ValueError(msg) for test_name, test_inputs in test_definition.copy().items(): test: type[AntaTest] | None = getattr(module, test_name, None) if test is None: msg = ( - f"{test_name} is not defined in Python module {module.__name__}" - f"{f' (from {module.__file__})' if module.__file__ is not None else ''}" + f"{test_name} is not defined in Python module {module.__name__}{f' (from {module.__file__})' if module.__file__ is not None else ''}" ) raise ValueError(msg) test_definitions.append(AntaTestDefinition(test=test, inputs=test_inputs)) diff --git a/anta/runner.py b/anta/runner.py index 93c433722..2b3c8b70e 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -115,7 +115,7 @@ async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devic # If there are no devices in the inventory after filtering, exit if not selected_inventory.devices: - msg = f'No reachable device {f"matching the tags {tags} " if tags else ""}was found.{f" Selected devices: {devices} " if devices is not None else ""}' + msg = f"No reachable device {f'matching the tags {tags} ' if tags else ''}was found.{f' Selected devices: {devices} ' if devices is not None else ''}" logger.warning(msg) return None @@ -170,8 +170,7 @@ def prepare_tests( if total_test_count == 0: msg = ( - f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current " - "test catalog and device inventory, please verify your inputs." + f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs." ) logger.warning(msg) return None diff --git a/anta/tests/system.py b/anta/tests/system.py index 85235f2fc..11cf8398c 100644 --- a/anta/tests/system.py +++ b/anta/tests/system.py @@ -224,7 +224,7 @@ def test(self) -> None: if memory_usage > MEMORY_THRESHOLD: self.result.is_success() else: - self.result.is_failure(f"Device has reported a high memory usage: {(1 - memory_usage)*100:.2f}%") + self.result.is_failure(f"Device has reported a high memory usage: {(1 - memory_usage) * 100:.2f}%") class VerifyFileSystemUtilization(AntaTest): diff --git a/pyproject.toml b/pyproject.toml index 1e85b01f0..09fee8c3e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,7 +75,7 @@ dev = [ "pytest-metadata>=3.0.0", "pytest>=7.4.0", "respx>=0.22.0", - "ruff>=0.5.4,<0.9.0", + "ruff>=0.5.4,<0.10.0", "tox>=4.10.0,<5.0.0", "types-PyYAML", "types-pyOpenSSL", @@ -428,6 +428,9 @@ runtime-evaluated-base-classes = ["pydantic.BaseModel", "anta.models.AntaTest.In "C901", # TODO: test function is too complex, needs a refactor "PLR0912" # Too many branches (15/12) (too-many-branches), needs a refactor ] +"anta/tests/logging.py" = [ + "A005", # TODO: Module `logging` shadows a Python standard-library module +] "anta/decorators.py" = [ "ANN401", # Ok to use Any type hint in our decorators ] diff --git a/tests/benchmark/test_anta.py b/tests/benchmark/test_anta.py index 91d3b3ff2..1daf7f369 100644 --- a/tests/benchmark/test_anta.py +++ b/tests/benchmark/test_anta.py @@ -47,7 +47,7 @@ def _() -> None: if len(results.results) != len(inventory) * len(catalog.tests): pytest.fail(f"Expected {len(inventory) * len(catalog.tests)} tests but got {len(results.results)}", pytrace=False) - bench_info = "\n--- ANTA NRFU Dry-Run Benchmark Information ---\n" f"Test count: {len(results.results)}\n" "-----------------------------------------------" + bench_info = f"\n--- ANTA NRFU Dry-Run Benchmark Information ---\nTest count: {len(results.results)}\n-----------------------------------------------" logger.info(bench_info) diff --git a/tests/units/test_runner.py b/tests/units/test_runner.py index b3ac1b56a..1b9c40c88 100644 --- a/tests/units/test_runner.py +++ b/tests/units/test_runner.py @@ -67,7 +67,7 @@ async def test_no_selected_device(caplog: pytest.LogCaptureFixture, inventory: A caplog.set_level(logging.WARNING) manager = ResultManager() await main(manager, inventory, FAKE_CATALOG, tags=tags, devices=devices) - msg = f'No reachable device {f"matching the tags {tags} " if tags else ""}was found.{f" Selected devices: {devices} " if devices is not None else ""}' + msg = f"No reachable device {f'matching the tags {tags} ' if tags else ''}was found.{f' Selected devices: {devices} ' if devices is not None else ''}" assert msg in caplog.messages From 5389bb01d206f42381ec1043e43e12ac6bb883c5 Mon Sep 17 00:00:00 2001 From: vitthalmagadum <122079046+vitthalmagadum@users.noreply.github.com> Date: Tue, 14 Jan 2025 09:58:22 +0530 Subject: [PATCH 2/8] feat(anta): Added the test case to verify syslog logging is enabled (#859) --- anta/tests/logging.py | 29 ++++++++++++++++++ examples/tests.yaml | 2 ++ tests/units/anta_tests/test_logging.py | 42 ++++++++++++++++++++++++++ 3 files changed, 73 insertions(+) diff --git a/anta/tests/logging.py b/anta/tests/logging.py index d32701836..4a1c594e8 100644 --- a/anta/tests/logging.py +++ b/anta/tests/logging.py @@ -43,6 +43,35 @@ def _get_logging_states(logger: logging.Logger, command_output: str) -> str: return log_states +class VerifySyslogLogging(AntaTest): + """Verifies if syslog logging is enabled. + + Expected Results + ---------------- + * Success: The test will pass if syslog logging is enabled. + * Failure: The test will fail if syslog logging is disabled. + + Examples + -------- + ```yaml + anta.tests.logging: + - VerifySyslogLogging: + ``` + """ + + categories: ClassVar[list[str]] = ["logging"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show logging", ofmt="text")] + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifySyslogLogging.""" + self.result.is_success() + log_output = self.instance_commands[0].text_output + + if "Syslog logging: enabled" not in _get_logging_states(self.logger, log_output): + self.result.is_failure("Syslog logging is disabled.") + + class VerifyLoggingPersistent(AntaTest): """Verifies if logging persistent is enabled and logs are saved in flash. diff --git a/examples/tests.yaml b/examples/tests.yaml index 77b534a74..0b573616c 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -307,6 +307,8 @@ anta.tests.logging: vrf: default - VerifyLoggingTimestamp: # Verifies if logs are generated with the appropriate timestamp. + - VerifySyslogLogging: + # Verifies if syslog logging is enabled. anta.tests.mlag: - VerifyMlagConfigSanity: # Verifies there are no MLAG config-sanity inconsistencies. diff --git a/tests/units/anta_tests/test_logging.py b/tests/units/anta_tests/test_logging.py index 6aeac4a21..0c1408817 100644 --- a/tests/units/anta_tests/test_logging.py +++ b/tests/units/anta_tests/test_logging.py @@ -16,6 +16,7 @@ VerifyLoggingPersistent, VerifyLoggingSourceIntf, VerifyLoggingTimestamp, + VerifySyslogLogging, ) from tests.units.anta_tests import test @@ -277,4 +278,45 @@ "inputs": None, "expected": {"result": "failure", "messages": ["Device has reported syslog messages with a severity of ERRORS or higher"]}, }, + { + "name": "success", + "test": VerifySyslogLogging, + "eos_data": [ + """Syslog logging: enabled + Buffer logging: level debugging + + External configuration: + active: + inactive: + + Facility Severity Effective Severity + -------------------- ------------- ------------------ + aaa debugging debugging + accounting debugging debugging""", + ], + "inputs": None, + "expected": {"result": "success"}, + }, + { + "name": "failure", + "test": VerifySyslogLogging, + "eos_data": [ + """Syslog logging: disabled + Buffer logging: level debugging + Console logging: level errors + Persistent logging: disabled + Monitor logging: level errors + + External configuration: + active: + inactive: + + Facility Severity Effective Severity + -------------------- ------------- ------------------ + aaa debugging debugging + accounting debugging debugging""", + ], + "inputs": None, + "expected": {"result": "failure", "messages": ["Syslog logging is disabled."]}, + }, ] From 5fda9651edc700df1ab7ef194de4df7e9b808424 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 14 Jan 2025 13:58:09 +0100 Subject: [PATCH 3/8] ci: pre-commit autoupdate (#1005) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/astral-sh/ruff-pre-commit: v0.9.0 → v0.9.1](https://github.com/astral-sh/ruff-pre-commit/compare/v0.9.0...v0.9.1) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Guillaume Mulocher --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1961ec0c4..42b0ce1cb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -46,7 +46,7 @@ repos: - '' - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.9.0 + rev: v0.9.1 hooks: - id: ruff name: Run Ruff linter From 8e5de9a6cceb6e56812518d7b33add341b50b5a8 Mon Sep 17 00:00:00 2001 From: vitthalmagadum <122079046+vitthalmagadum@users.noreply.github.com> Date: Tue, 14 Jan 2025 18:59:16 +0530 Subject: [PATCH 4/8] feat(anta): Added the test case to verify SNMP user (#877) * Added TC for SNMP user * Updated input model refactoring changes * updated documentation apis * added unit tests for the input models * addressed review comments: updated docstrings, input model * updated field validator * Addressed review comments: updated input model docstrings * Remove unnecessary TypeVar --------- Co-authored-by: VitthalMagadum Co-authored-by: Carl Baillargeon --- anta/custom_types.py | 4 +- anta/input_models/snmp.py | 35 ++++++ anta/tests/snmp.py | 79 ++++++++++++ docs/api/tests.snmp.md | 16 +++ examples/tests.yaml | 8 ++ tests/units/anta_tests/test_snmp.py | 165 ++++++++++++++++++++++++++ tests/units/input_models/test_snmp.py | 44 +++++++ 7 files changed, 350 insertions(+), 1 deletion(-) create mode 100644 anta/input_models/snmp.py create mode 100644 tests/units/input_models/test_snmp.py diff --git a/anta/custom_types.py b/anta/custom_types.py index bd6a7b8d2..4763be495 100644 --- a/anta/custom_types.py +++ b/anta/custom_types.py @@ -208,7 +208,6 @@ def validate_regex(value: str) -> str: SnmpErrorCounter = Literal[ "inVersionErrs", "inBadCommunityNames", "inBadCommunityUses", "inParseErrs", "outTooBigErrs", "outNoSuchNameErrs", "outBadValueErrs", "outGeneralErrs" ] - IPv4RouteType = Literal[ "connected", "static", @@ -238,3 +237,6 @@ def validate_regex(value: str) -> str: "Route Cache Route", "CBF Leaked Route", ] +SnmpVersion = Literal["v1", "v2c", "v3"] +SnmpHashingAlgorithm = Literal["MD5", "SHA", "SHA-224", "SHA-256", "SHA-384", "SHA-512"] +SnmpEncryptionAlgorithm = Literal["AES-128", "AES-192", "AES-256", "DES"] diff --git a/anta/input_models/snmp.py b/anta/input_models/snmp.py new file mode 100644 index 000000000..680d617c0 --- /dev/null +++ b/anta/input_models/snmp.py @@ -0,0 +1,35 @@ +# Copyright (c) 2023-2025 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Module containing input models for SNMP tests.""" + +from __future__ import annotations + +from pydantic import BaseModel, ConfigDict + +from anta.custom_types import SnmpEncryptionAlgorithm, SnmpHashingAlgorithm, SnmpVersion + + +class SnmpUser(BaseModel): + """Model for a SNMP User.""" + + model_config = ConfigDict(extra="forbid") + username: str + """SNMP user name.""" + group_name: str + """SNMP group for the user.""" + version: SnmpVersion + """SNMP protocol version.""" + auth_type: SnmpHashingAlgorithm | None = None + """User authentication algorithm. Can be provided in the `VerifySnmpUser` test.""" + priv_type: SnmpEncryptionAlgorithm | None = None + """User privacy algorithm. Can be provided in the `VerifySnmpUser` test.""" + + def __str__(self) -> str: + """Return a human-readable string representation of the SnmpUser for reporting. + + Examples + -------- + - User: Test Group: Test_Group Version: v2c + """ + return f"User: {self.username} Group: {self.group_name} Version: {self.version}" diff --git a/anta/tests/snmp.py b/anta/tests/snmp.py index 36b97ba2f..ef7410430 100644 --- a/anta/tests/snmp.py +++ b/anta/tests/snmp.py @@ -9,7 +9,10 @@ from typing import TYPE_CHECKING, ClassVar, get_args +from pydantic import field_validator + from anta.custom_types import PositiveInteger, SnmpErrorCounter, SnmpPdu +from anta.input_models.snmp import SnmpUser from anta.models import AntaCommand, AntaTest from anta.tools import get_value @@ -339,3 +342,79 @@ def test(self) -> None: self.result.is_success() else: self.result.is_failure(f"The following SNMP error counters are not found or have non-zero error counters:\n{error_counters_not_ok}") + + +class VerifySnmpUser(AntaTest): + """Verifies the SNMP user configurations. + + This test performs the following checks for each specified user: + + 1. User exists in SNMP configuration. + 2. Group assignment is correct. + 3. For SNMPv3 users only: + - Authentication type matches (if specified) + - Privacy type matches (if specified) + + Expected Results + ---------------- + * Success: If all of the following conditions are met: + - All users exist with correct group assignments. + - SNMPv3 authentication and privacy types match specified values. + * Failure: If any of the following occur: + - User not found in SNMP configuration. + - Incorrect group assignment. + - For SNMPv3: Mismatched authentication or privacy types. + + Examples + -------- + ```yaml + anta.tests.snmp: + - VerifySnmpUser: + snmp_users: + - username: test + group_name: test_group + version: v3 + auth_type: MD5 + priv_type: AES-128 + ``` + """ + + categories: ClassVar[list[str]] = ["snmp"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show snmp user", revision=1)] + + class Input(AntaTest.Input): + """Input model for the VerifySnmpUser test.""" + + snmp_users: list[SnmpUser] + """List of SNMP users.""" + + @field_validator("snmp_users") + @classmethod + def validate_snmp_users(cls, snmp_users: list[SnmpUser]) -> list[SnmpUser]: + """Validate that 'auth_type' or 'priv_type' field is provided in each SNMPv3 user.""" + for user in snmp_users: + if user.version == "v3" and not (user.auth_type or user.priv_type): + msg = f"{user} 'auth_type' or 'priv_type' field is required with 'version: v3'" + raise ValueError(msg) + return snmp_users + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifySnmpUser.""" + self.result.is_success() + + for user in self.inputs.snmp_users: + # Verify SNMP user details. + if not (user_details := get_value(self.instance_commands[0].json_output, f"usersByVersion.{user.version}.users.{user.username}")): + self.result.is_failure(f"{user} - Not found") + continue + + if user.group_name != (act_group := user_details.get("groupName", "Not Found")): + self.result.is_failure(f"{user} - Incorrect user group - Actual: {act_group}") + + if user.version == "v3": + if user.auth_type and (act_auth_type := get_value(user_details, "v3Params.authType", "Not Found")) != user.auth_type: + self.result.is_failure(f"{user} - Incorrect authentication type - Expected: {user.auth_type} Actual: {act_auth_type}") + + if user.priv_type and (act_encryption := get_value(user_details, "v3Params.privType", "Not Found")) != user.priv_type: + self.result.is_failure(f"{user} - Incorrect privacy type - Expected: {user.priv_type} Actual: {act_encryption}") diff --git a/docs/api/tests.snmp.md b/docs/api/tests.snmp.md index 85c147e2a..eec97202f 100644 --- a/docs/api/tests.snmp.md +++ b/docs/api/tests.snmp.md @@ -7,7 +7,10 @@ anta_title: ANTA catalog for SNMP tests ~ that can be found in the LICENSE file. --> +# Tests + ::: anta.tests.snmp + options: show_root_heading: false show_root_toc_entry: false @@ -18,3 +21,16 @@ anta_title: ANTA catalog for SNMP tests filters: - "!test" - "!render" + +# Input models + +::: anta.input_models.snmp + + options: + show_root_heading: false + show_root_toc_entry: false + show_bases: false + merge_init_into_class: false + anta_hide_test_module_description: true + show_labels: true + filters: ["!^__str__"] diff --git a/examples/tests.yaml b/examples/tests.yaml index 0b573616c..c2e00f009 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -746,6 +746,14 @@ anta.tests.snmp: - VerifySnmpStatus: # Verifies if the SNMP agent is enabled. vrf: default + - VerifySnmpUser: + # Verifies the SNMP user configurations. + snmp_users: + - username: test + group_name: test_group + version: v3 + auth_type: MD5 + priv_type: AES-128 anta.tests.software: - VerifyEOSExtensions: # Verifies that all EOS extensions installed on the device are enabled for boot persistence. diff --git a/tests/units/anta_tests/test_snmp.py b/tests/units/anta_tests/test_snmp.py index 195ef298e..d2eb6b87f 100644 --- a/tests/units/anta_tests/test_snmp.py +++ b/tests/units/anta_tests/test_snmp.py @@ -15,6 +15,7 @@ VerifySnmpLocation, VerifySnmpPDUCounters, VerifySnmpStatus, + VerifySnmpUser, ) from tests.units.anta_tests import test @@ -319,4 +320,168 @@ ], }, }, + { + "name": "success", + "test": VerifySnmpUser, + "eos_data": [ + { + "usersByVersion": { + "v1": { + "users": { + "Test1": { + "groupName": "TestGroup1", + }, + } + }, + "v2c": { + "users": { + "Test2": { + "groupName": "TestGroup2", + }, + } + }, + "v3": { + "users": { + "Test3": { + "groupName": "TestGroup3", + "v3Params": {"authType": "SHA-384", "privType": "AES-128"}, + }, + "Test4": {"groupName": "TestGroup3", "v3Params": {"authType": "SHA-512", "privType": "AES-192"}}, + } + }, + } + } + ], + "inputs": { + "snmp_users": [ + {"username": "Test1", "group_name": "TestGroup1", "version": "v1"}, + {"username": "Test2", "group_name": "TestGroup2", "version": "v2c"}, + {"username": "Test3", "group_name": "TestGroup3", "version": "v3", "auth_type": "SHA-384", "priv_type": "AES-128"}, + {"username": "Test4", "group_name": "TestGroup3", "version": "v3", "auth_type": "SHA-512", "priv_type": "AES-192"}, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure-not-configured", + "test": VerifySnmpUser, + "eos_data": [ + { + "usersByVersion": { + "v3": { + "users": { + "Test3": { + "groupName": "TestGroup3", + "v3Params": {"authType": "SHA-384", "privType": "AES-128"}, + }, + } + }, + } + } + ], + "inputs": { + "snmp_users": [ + {"username": "Test1", "group_name": "TestGroup1", "version": "v1"}, + {"username": "Test2", "group_name": "TestGroup2", "version": "v2c"}, + {"username": "Test3", "group_name": "TestGroup3", "version": "v3", "auth_type": "SHA-384", "priv_type": "AES-128"}, + {"username": "Test4", "group_name": "TestGroup3", "version": "v3", "auth_type": "SHA-512", "priv_type": "AES-192"}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "User: Test1 Group: TestGroup1 Version: v1 - Not found", + "User: Test2 Group: TestGroup2 Version: v2c - Not found", + "User: Test4 Group: TestGroup3 Version: v3 - Not found", + ], + }, + }, + { + "name": "failure-incorrect-group", + "test": VerifySnmpUser, + "eos_data": [ + { + "usersByVersion": { + "v1": { + "users": { + "Test1": { + "groupName": "TestGroup2", + }, + } + }, + "v2c": { + "users": { + "Test2": { + "groupName": "TestGroup1", + }, + } + }, + "v3": {}, + } + } + ], + "inputs": { + "snmp_users": [ + {"username": "Test1", "group_name": "TestGroup1", "version": "v1"}, + {"username": "Test2", "group_name": "TestGroup2", "version": "v2c"}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "User: Test1 Group: TestGroup1 Version: v1 - Incorrect user group - Actual: TestGroup2", + "User: Test2 Group: TestGroup2 Version: v2c - Incorrect user group - Actual: TestGroup1", + ], + }, + }, + { + "name": "failure-incorrect-auth-encryption", + "test": VerifySnmpUser, + "eos_data": [ + { + "usersByVersion": { + "v1": { + "users": { + "Test1": { + "groupName": "TestGroup1", + }, + } + }, + "v2c": { + "users": { + "Test2": { + "groupName": "TestGroup2", + }, + } + }, + "v3": { + "users": { + "Test3": { + "groupName": "TestGroup3", + "v3Params": {"authType": "SHA-512", "privType": "AES-192"}, + }, + "Test4": {"groupName": "TestGroup4", "v3Params": {"authType": "SHA-384", "privType": "AES-128"}}, + } + }, + } + } + ], + "inputs": { + "snmp_users": [ + {"username": "Test1", "group_name": "TestGroup1", "version": "v1"}, + {"username": "Test2", "group_name": "TestGroup2", "version": "v2c"}, + {"username": "Test3", "group_name": "TestGroup3", "version": "v3", "auth_type": "SHA-384", "priv_type": "AES-128"}, + {"username": "Test4", "group_name": "TestGroup4", "version": "v3", "auth_type": "SHA-512", "priv_type": "AES-192"}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "User: Test3 Group: TestGroup3 Version: v3 - Incorrect authentication type - Expected: SHA-384 Actual: SHA-512", + "User: Test3 Group: TestGroup3 Version: v3 - Incorrect privacy type - Expected: AES-128 Actual: AES-192", + "User: Test4 Group: TestGroup4 Version: v3 - Incorrect authentication type - Expected: SHA-512 Actual: SHA-384", + "User: Test4 Group: TestGroup4 Version: v3 - Incorrect privacy type - Expected: AES-192 Actual: AES-128", + ], + }, + }, ] diff --git a/tests/units/input_models/test_snmp.py b/tests/units/input_models/test_snmp.py new file mode 100644 index 000000000..94551ca76 --- /dev/null +++ b/tests/units/input_models/test_snmp.py @@ -0,0 +1,44 @@ +# Copyright (c) 2023-2025 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Tests for anta.input_models.snmp.py.""" + +# pylint: disable=C0302 +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from pydantic import ValidationError + +from anta.tests.snmp import VerifySnmpUser + +if TYPE_CHECKING: + from anta.input_models.snmp import SnmpUser + + +class TestVerifySnmpUserInput: + """Test anta.tests.snmp.VerifySnmpUser.Input.""" + + @pytest.mark.parametrize( + ("snmp_users"), + [ + pytest.param([{"username": "test", "group_name": "abc", "version": "v1", "auth_type": None, "priv_type": None}], id="valid-v1"), + pytest.param([{"username": "test", "group_name": "abc", "version": "v2c", "auth_type": None, "priv_type": None}], id="valid-v2c"), + pytest.param([{"username": "test", "group_name": "abc", "version": "v3", "auth_type": "SHA", "priv_type": "AES-128"}], id="valid-v3"), + ], + ) + def test_valid(self, snmp_users: list[SnmpUser]) -> None: + """Test VerifySnmpUser.Input valid inputs.""" + VerifySnmpUser.Input(snmp_users=snmp_users) + + @pytest.mark.parametrize( + ("snmp_users"), + [ + pytest.param([{"username": "test", "group_name": "abc", "version": "v3", "auth_type": None, "priv_type": None}], id="invalid-v3"), + ], + ) + def test_invalid(self, snmp_users: list[SnmpUser]) -> None: + """Test VerifySnmpUser.Input invalid inputs.""" + with pytest.raises(ValidationError): + VerifySnmpUser.Input(snmp_users=snmp_users) From c91193ed737a7fe1fce98f4fcef52681db6519a7 Mon Sep 17 00:00:00 2001 From: geetanjalimanegslab <96573243+geetanjalimanegslab@users.noreply.github.com> Date: Tue, 14 Jan 2025 19:24:54 +0530 Subject: [PATCH 5/8] feat(anta): Added test case to verify dynamic vlans source (#978) * Issue_864:Added test case to verify dynamic vlans source * Issue_864: Remove duplicate unit testcases * Issue_864:Added check for dynamic vlan source exact match and updated UT * Issue_864: Formatted test description for mkdocs * Issue_864: Updated docstrings and variable name * Issue_864: Updated Test failure messages and TC description * Issue_864: Updated test.yaml file * Issue_864: Removed duplucate testcases * Issue_864: Updated docstring and the testcase logic * Issue_864: Updated test failure message * Updated test failure messages and docstrings * Updated test failures messages and testcases * Update code logic to reflect docstring --------- Co-authored-by: Geetanjali Mane Co-authored-by: Carl Baillargeon --- anta/custom_types.py | 1 + anta/tests/vlan.py | 76 ++++++++++++++++++++++++++++- examples/tests.yaml | 6 +++ tests/units/anta_tests/test_vlan.py | 50 ++++++++++++++++++- 4 files changed, 131 insertions(+), 2 deletions(-) diff --git a/anta/custom_types.py b/anta/custom_types.py index 4763be495..f3877459f 100644 --- a/anta/custom_types.py +++ b/anta/custom_types.py @@ -240,3 +240,4 @@ def validate_regex(value: str) -> str: SnmpVersion = Literal["v1", "v2c", "v3"] SnmpHashingAlgorithm = Literal["MD5", "SHA", "SHA-224", "SHA-256", "SHA-384", "SHA-512"] SnmpEncryptionAlgorithm = Literal["AES-128", "AES-192", "AES-256", "DES"] +DynamicVlanSource = Literal["dmf", "dot1x", "dynvtep", "evpn", "mlag", "mlagsync", "mvpn", "swfwd", "vccbfd"] diff --git a/anta/tests/vlan.py b/anta/tests/vlan.py index f2a1934cc..09b450ae6 100644 --- a/anta/tests/vlan.py +++ b/anta/tests/vlan.py @@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, ClassVar, Literal -from anta.custom_types import Vlan +from anta.custom_types import DynamicVlanSource, Vlan from anta.models import AntaCommand, AntaTest from anta.tools import get_failed_logs, get_value @@ -68,3 +68,77 @@ def test(self) -> None: self.result.is_failure(failed_log) else: self.result.is_success() + + +class VerifyDynamicVlanSource(AntaTest): + """Verifies dynamic VLAN allocation for specified VLAN sources. + + This test performs the following checks for each specified VLAN source: + + 1. Validates source exists in dynamic VLAN table. + 2. Verifies at least one VLAN is allocated to the source. + 3. When strict mode is enabled (`strict: true`), ensures no other sources have VLANs allocated. + + Expected Results + ---------------- + * Success: The test will pass if all of the following conditions are met: + - Each specified source exists in dynamic VLAN table. + - Each specified source has at least one VLAN allocated. + - In strict mode: No other sources have VLANs allocated. + * Failure: The test will fail if any of the following conditions is met: + - Specified source not found in configuration. + - Source exists but has no VLANs allocated. + - In strict mode: Non-specified sources have VLANs allocated. + + Examples + -------- + ```yaml + anta.tests.vlan: + - VerifyDynamicVlanSource: + sources: + - evpn + - mlagsync + strict: False + ``` + """ + + categories: ClassVar[list[str]] = ["vlan"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show vlan dynamic", revision=1)] + + class Input(AntaTest.Input): + """Input model for the VerifyDynamicVlanSource test.""" + + sources: list[DynamicVlanSource] + """The dynamic VLAN source list.""" + strict: bool = False + """If True, only specified sources are allowed to have VLANs allocated. Default is False.""" + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyDynamicVlanSource.""" + self.result.is_success() + command_output = self.instance_commands[0].json_output + dynamic_vlans = command_output.get("dynamicVlans", {}) + + # Get all configured sources and sources with VLANs allocated + configured_sources = set(dynamic_vlans.keys()) + sources_with_vlans = {source for source, data in dynamic_vlans.items() if data.get("vlanIds")} + expected_sources = set(self.inputs.sources) + + # Check if all specified sources exist in configuration + missing_sources = expected_sources - configured_sources + if missing_sources: + self.result.is_failure(f"Dynamic VLAN source(s) not found in configuration: {', '.join(sorted(missing_sources))}") + return + + # Check if configured sources have VLANs allocated + sources_without_vlans = expected_sources - sources_with_vlans + if sources_without_vlans: + self.result.is_failure(f"Dynamic VLAN source(s) exist but have no VLANs allocated: {', '.join(sorted(sources_without_vlans))}") + return + + # In strict mode, verify no other sources have VLANs allocated + if self.inputs.strict: + unexpected_sources = sources_with_vlans - expected_sources + if unexpected_sources: + self.result.is_failure(f"Strict mode enabled: Unexpected sources have VLANs allocated: {', '.join(sorted(unexpected_sources))}") diff --git a/examples/tests.yaml b/examples/tests.yaml index c2e00f009..db1d179d6 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -847,6 +847,12 @@ anta.tests.system: # Verifies the device uptime. minimum: 86400 anta.tests.vlan: + - VerifyDynamicVlanSource: + # Verifies dynamic VLAN allocation for specified VLAN sources. + sources: + - evpn + - mlagsync + strict: False - VerifyVlanInternalPolicy: # Verifies the VLAN internal allocation policy and the range of VLANs. policy: ascending diff --git a/tests/units/anta_tests/test_vlan.py b/tests/units/anta_tests/test_vlan.py index e68bd06dc..7fc8b8688 100644 --- a/tests/units/anta_tests/test_vlan.py +++ b/tests/units/anta_tests/test_vlan.py @@ -7,7 +7,7 @@ from typing import Any -from anta.tests.vlan import VerifyVlanInternalPolicy +from anta.tests.vlan import VerifyDynamicVlanSource, VerifyVlanInternalPolicy from tests.units.anta_tests import test DATA: list[dict[str, Any]] = [ @@ -33,4 +33,52 @@ ], }, }, + { + "name": "success", + "test": VerifyDynamicVlanSource, + "eos_data": [{"dynamicVlans": {"evpn": {"vlanIds": [1199]}, "mlagsync": {"vlanIds": [1401]}, "vccbfd": {"vlanIds": [1501]}}}], + "inputs": {"sources": ["evpn", "mlagsync"], "strict": False}, + "expected": {"result": "success"}, + }, + { + "name": "failure-no-dynamic-vlan-sources", + "test": VerifyDynamicVlanSource, + "eos_data": [{"dynamicVlans": {}}], + "inputs": {"sources": ["evpn", "mlagsync"], "strict": False}, + "expected": {"result": "failure", "messages": ["Dynamic VLAN source(s) not found in configuration: evpn, mlagsync"]}, + }, + { + "name": "failure-dynamic-vlan-sources-mismatch", + "test": VerifyDynamicVlanSource, + "eos_data": [{"dynamicVlans": {"vccbfd": {"vlanIds": [1500]}, "mlagsync": {"vlanIds": [1501]}}}], + "inputs": {"sources": ["evpn", "mlagsync"], "strict": False}, + "expected": { + "result": "failure", + "messages": ["Dynamic VLAN source(s) not found in configuration: evpn"], + }, + }, + { + "name": "success-strict-mode", + "test": VerifyDynamicVlanSource, + "eos_data": [{"dynamicVlans": {"evpn": {"vlanIds": [1199]}, "mlagsync": {"vlanIds": [1502], "vccbfd": {"vlanIds": []}}}}], + "inputs": {"sources": ["evpn", "mlagsync"], "strict": True}, + "expected": {"result": "success"}, + }, + { + "name": "failure-all-sources-exact-match-additional-source-found", + "test": VerifyDynamicVlanSource, + "eos_data": [{"dynamicVlans": {"evpn": {"vlanIds": [1199]}, "mlagsync": {"vlanIds": [1500]}, "vccbfd": {"vlanIds": [1500]}}}], + "inputs": {"sources": ["evpn", "mlagsync"], "strict": True}, + "expected": { + "result": "failure", + "messages": ["Strict mode enabled: Unexpected sources have VLANs allocated: vccbfd"], + }, + }, + { + "name": "failure-all-sources-exact-match-expected-source-not-found", + "test": VerifyDynamicVlanSource, + "eos_data": [{"dynamicVlans": {"evpn": {"vlanIds": [1199]}, "mlagsync": {"vlanIds": []}}}], + "inputs": {"sources": ["evpn", "mlagsync"], "strict": True}, + "expected": {"result": "failure", "messages": ["Dynamic VLAN source(s) exist but have no VLANs allocated: mlagsync"]}, + }, ] From 509c3cf9ba2c177fb8f693b94f56282697cb4e44 Mon Sep 17 00:00:00 2001 From: vitthalmagadum <122079046+vitthalmagadum@users.noreply.github.com> Date: Tue, 14 Jan 2025 19:58:29 +0530 Subject: [PATCH 6/8] feat(anta): Added the test case to verify SNMP Logging Configuration (#849) * issue_821 Added TC for SNMP logging * Issue_821: Refactored testcase with latest changes of input modules and updated TC, docstrings * Issue_821: Updated docstrings and test class, variable names * Minor updates --------- Co-authored-by: VitthalMagadum Co-authored-by: Geetanjali.mane Co-authored-by: Carl Baillargeon --- anta/input_models/snmp.py | 23 ++++++++- anta/tests/snmp.py | 73 ++++++++++++++++++++++++++++- examples/tests.yaml | 7 +++ tests/units/anta_tests/test_snmp.py | 52 ++++++++++++++++++++ 4 files changed, 153 insertions(+), 2 deletions(-) diff --git a/anta/input_models/snmp.py b/anta/input_models/snmp.py index 680d617c0..bd7350e45 100644 --- a/anta/input_models/snmp.py +++ b/anta/input_models/snmp.py @@ -5,9 +5,30 @@ from __future__ import annotations +from ipaddress import IPv4Address + from pydantic import BaseModel, ConfigDict -from anta.custom_types import SnmpEncryptionAlgorithm, SnmpHashingAlgorithm, SnmpVersion +from anta.custom_types import Hostname, SnmpEncryptionAlgorithm, SnmpHashingAlgorithm, SnmpVersion + + +class SnmpHost(BaseModel): + """Model for a SNMP host.""" + + model_config = ConfigDict(extra="forbid") + hostname: IPv4Address | Hostname + """IPv4 address or hostname of the SNMP notification host.""" + vrf: str = "default" + """Optional VRF for SNMP hosts. If not provided, it defaults to `default`.""" + + def __str__(self) -> str: + """Return a human-readable string representation of the SnmpHost for reporting. + + Examples + -------- + - Host: 192.168.1.100 VRF: default + """ + return f"Host: {self.hostname} VRF: {self.vrf}" class SnmpUser(BaseModel): diff --git a/anta/tests/snmp.py b/anta/tests/snmp.py index ef7410430..910d5929c 100644 --- a/anta/tests/snmp.py +++ b/anta/tests/snmp.py @@ -12,7 +12,7 @@ from pydantic import field_validator from anta.custom_types import PositiveInteger, SnmpErrorCounter, SnmpPdu -from anta.input_models.snmp import SnmpUser +from anta.input_models.snmp import SnmpHost, SnmpUser from anta.models import AntaCommand, AntaTest from anta.tools import get_value @@ -344,6 +344,77 @@ def test(self) -> None: self.result.is_failure(f"The following SNMP error counters are not found or have non-zero error counters:\n{error_counters_not_ok}") +class VerifySnmpHostLogging(AntaTest): + """Verifies SNMP logging configurations. + + This test performs the following checks: + + 1. SNMP logging is enabled globally. + 2. For each specified SNMP host: + - Host exists in configuration. + - Host's VRF assignment matches expected value. + + Expected Results + ---------------- + * Success: The test will pass if all of the following conditions are met: + - SNMP logging is enabled on the device. + - All specified hosts are configured with correct VRF assignments. + * Failure: The test will fail if any of the following conditions is met: + - SNMP logging is disabled on the device. + - SNMP host not found in configuration. + - Host's VRF assignment doesn't match expected value. + + Examples + -------- + ```yaml + anta.tests.snmp: + - VerifySnmpHostLogging: + hosts: + - hostname: 192.168.1.100 + vrf: default + - hostname: 192.168.1.103 + vrf: MGMT + ``` + """ + + categories: ClassVar[list[str]] = ["snmp"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show snmp", revision=1)] + + class Input(AntaTest.Input): + """Input model for the VerifySnmpHostLogging test.""" + + hosts: list[SnmpHost] + """List of SNMP hosts.""" + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifySnmpHostLogging.""" + self.result.is_success() + + command_output = self.instance_commands[0].json_output.get("logging", {}) + # If SNMP logging is disabled, test fails. + if not command_output.get("loggingEnabled"): + self.result.is_failure("SNMP logging is disabled") + return + + host_details = command_output.get("hosts", {}) + + for host in self.inputs.hosts: + hostname = str(host.hostname) + vrf = host.vrf + actual_snmp_host = host_details.get(hostname, {}) + + # If SNMP host is not configured on the device, test fails. + if not actual_snmp_host: + self.result.is_failure(f"{host} - Not configured") + continue + + # If VRF is not matches the expected value, test fails. + actual_vrf = "default" if (vrf_name := actual_snmp_host.get("vrf")) == "" else vrf_name + if actual_vrf != vrf: + self.result.is_failure(f"{host} - Incorrect VRF - Actual: {actual_vrf}") + + class VerifySnmpUser(AntaTest): """Verifies the SNMP user configurations. diff --git a/examples/tests.yaml b/examples/tests.yaml index db1d179d6..387d3bb0a 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -727,6 +727,13 @@ anta.tests.snmp: # Verifies the SNMP error counters. error_counters: - inVersionErrs + - VerifySnmpHostLogging: + # Verifies SNMP logging configurations. + hosts: + - hostname: 192.168.1.100 + vrf: default + - hostname: 192.168.1.103 + vrf: MGMT - VerifySnmpIPv4Acl: # Verifies if the SNMP agent has IPv4 ACL(s) configured. number: 3 diff --git a/tests/units/anta_tests/test_snmp.py b/tests/units/anta_tests/test_snmp.py index d2eb6b87f..ebbf94897 100644 --- a/tests/units/anta_tests/test_snmp.py +++ b/tests/units/anta_tests/test_snmp.py @@ -10,6 +10,7 @@ from anta.tests.snmp import ( VerifySnmpContact, VerifySnmpErrorCounters, + VerifySnmpHostLogging, VerifySnmpIPv4Acl, VerifySnmpIPv6Acl, VerifySnmpLocation, @@ -320,6 +321,57 @@ ], }, }, + { + "name": "success", + "test": VerifySnmpHostLogging, + "eos_data": [ + { + "logging": { + "loggingEnabled": True, + "hosts": { + "192.168.1.100": {"port": 162, "vrf": ""}, + "192.168.1.101": {"port": 162, "vrf": "MGMT"}, + "snmp-server-01": {"port": 162, "vrf": "default"}, + }, + } + } + ], + "inputs": { + "hosts": [ + {"hostname": "192.168.1.100", "vrf": "default"}, + {"hostname": "192.168.1.101", "vrf": "MGMT"}, + {"hostname": "snmp-server-01", "vrf": "default"}, + ] + }, + "expected": {"result": "success"}, + }, + { + "name": "failure-logging-disabled", + "test": VerifySnmpHostLogging, + "eos_data": [{"logging": {"loggingEnabled": False}}], + "inputs": {"hosts": [{"hostname": "192.168.1.100", "vrf": "default"}, {"hostname": "192.168.1.101", "vrf": "MGMT"}]}, + "expected": {"result": "failure", "messages": ["SNMP logging is disabled"]}, + }, + { + "name": "failure-mismatch-vrf", + "test": VerifySnmpHostLogging, + "eos_data": [{"logging": {"loggingEnabled": True, "hosts": {"192.168.1.100": {"port": 162, "vrf": "MGMT"}, "192.168.1.101": {"port": 162, "vrf": "Test"}}}}], + "inputs": {"hosts": [{"hostname": "192.168.1.100", "vrf": "default"}, {"hostname": "192.168.1.101", "vrf": "MGMT"}]}, + "expected": { + "result": "failure", + "messages": ["Host: 192.168.1.100 VRF: default - Incorrect VRF - Actual: MGMT", "Host: 192.168.1.101 VRF: MGMT - Incorrect VRF - Actual: Test"], + }, + }, + { + "name": "failure-host-not-configured", + "test": VerifySnmpHostLogging, + "eos_data": [{"logging": {"loggingEnabled": True, "hosts": {"192.168.1.100": {"port": 162, "vrf": "MGMT"}, "192.168.1.103": {"port": 162, "vrf": "Test"}}}}], + "inputs": {"hosts": [{"hostname": "192.168.1.101", "vrf": "default"}, {"hostname": "192.168.1.102", "vrf": "MGMT"}]}, + "expected": { + "result": "failure", + "messages": ["Host: 192.168.1.101 VRF: default - Not configured", "Host: 192.168.1.102 VRF: MGMT - Not configured"], + }, + }, { "name": "success", "test": VerifySnmpUser, From 1acbdcb7c7d7e230559ceb9932ed7ab99deee31c Mon Sep 17 00:00:00 2001 From: vitthalmagadum <122079046+vitthalmagadum@users.noreply.github.com> Date: Tue, 14 Jan 2025 20:38:16 +0530 Subject: [PATCH 7/8] fix(anta.tests): BFD test module: AntaTest.Input subclasses using common input models should have validators for their required fields. (#999) * issue_997: BFD tests with proper validators * improved the test coverage * updated validator function * addressed review commenrs: updated validator msgs --- anta/tests/bfd.py | 34 +++++++++++++- tests/units/input_models/test_bfd.py | 68 ++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 tests/units/input_models/test_bfd.py diff --git a/anta/tests/bfd.py b/anta/tests/bfd.py index 861a6a2e4..2361a4221 100644 --- a/anta/tests/bfd.py +++ b/anta/tests/bfd.py @@ -8,9 +8,9 @@ from __future__ import annotations from datetime import datetime, timezone -from typing import TYPE_CHECKING, ClassVar +from typing import TYPE_CHECKING, ClassVar, TypeVar -from pydantic import Field +from pydantic import Field, field_validator from anta.input_models.bfd import BFDPeer from anta.models import AntaCommand, AntaTest @@ -19,6 +19,9 @@ if TYPE_CHECKING: from anta.models import AntaTemplate +# Using a TypeVar for the BFDPeer model since mypy thinks it's a ClassVar and not a valid type when used in field validators +T = TypeVar("T", bound=BFDPeer) + class VerifyBFDSpecificPeers(AntaTest): """Verifies the state of IPv4 BFD peer sessions. @@ -143,6 +146,23 @@ class Input(AntaTest.Input): BFDPeer: ClassVar[type[BFDPeer]] = BFDPeer """To maintain backward compatibility""" + @field_validator("bfd_peers") + @classmethod + def validate_bfd_peers(cls, bfd_peers: list[T]) -> list[T]: + """Validate that 'tx_interval', 'rx_interval' and 'multiplier' fields are provided in each BFD peer.""" + for peer in bfd_peers: + missing_fileds = [] + if peer.tx_interval is None: + missing_fileds.append("tx_interval") + if peer.rx_interval is None: + missing_fileds.append("rx_interval") + if peer.multiplier is None: + missing_fileds.append("multiplier") + if missing_fileds: + msg = f"{peer} {', '.join(missing_fileds)} field(s) are missing in the input." + raise ValueError(msg) + return bfd_peers + @AntaTest.anta_test def test(self) -> None: """Main test function for VerifyBFDPeersIntervals.""" @@ -308,6 +328,16 @@ class Input(AntaTest.Input): BFDPeer: ClassVar[type[BFDPeer]] = BFDPeer """To maintain backward compatibility""" + @field_validator("bfd_peers") + @classmethod + def validate_bfd_peers(cls, bfd_peers: list[T]) -> list[T]: + """Validate that 'protocols' field is provided in each BFD peer.""" + for peer in bfd_peers: + if peer.protocols is None: + msg = f"{peer} 'protocols' field missing in the input." + raise ValueError(msg) + return bfd_peers + @AntaTest.anta_test def test(self) -> None: """Main test function for VerifyBFDPeersRegProtocols.""" diff --git a/tests/units/input_models/test_bfd.py b/tests/units/input_models/test_bfd.py new file mode 100644 index 000000000..e179f39fe --- /dev/null +++ b/tests/units/input_models/test_bfd.py @@ -0,0 +1,68 @@ +# Copyright (c) 2023-2025 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Tests for anta.input_models.bfd.py.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from pydantic import ValidationError + +from anta.tests.bfd import VerifyBFDPeersIntervals, VerifyBFDPeersRegProtocols + +if TYPE_CHECKING: + from anta.input_models.bfd import BFDPeer + + +class TestVerifyBFDPeersIntervalsInput: + """Test anta.tests.bfd.VerifyBFDPeersIntervals.Input.""" + + @pytest.mark.parametrize( + ("bfd_peers"), + [ + pytest.param([{"peer_address": "10.0.0.1", "vrf": "default", "tx_interval": 1200, "rx_interval": 1200, "multiplier": 3}], id="valid"), + ], + ) + def test_valid(self, bfd_peers: list[BFDPeer]) -> None: + """Test VerifyBFDPeersIntervals.Input valid inputs.""" + VerifyBFDPeersIntervals.Input(bfd_peers=bfd_peers) + + @pytest.mark.parametrize( + ("bfd_peers"), + [ + pytest.param([{"peer_address": "10.0.0.1", "vrf": "default", "tx_interval": 1200}], id="invalid-tx-interval"), + pytest.param([{"peer_address": "10.0.0.1", "vrf": "default", "rx_interval": 1200}], id="invalid-rx-interval"), + pytest.param([{"peer_address": "10.0.0.1", "vrf": "default", "tx_interval": 1200, "rx_interval": 1200}], id="invalid-multiplier"), + ], + ) + def test_invalid(self, bfd_peers: list[BFDPeer]) -> None: + """Test VerifyBFDPeersIntervals.Input invalid inputs.""" + with pytest.raises(ValidationError): + VerifyBFDPeersIntervals.Input(bfd_peers=bfd_peers) + + +class TestVerifyBFDPeersRegProtocolsInput: + """Test anta.tests.bfd.VerifyBFDPeersRegProtocols.Input.""" + + @pytest.mark.parametrize( + ("bfd_peers"), + [ + pytest.param([{"peer_address": "10.0.0.1", "vrf": "default", "protocols": ["bgp"]}], id="valid"), + ], + ) + def test_valid(self, bfd_peers: list[BFDPeer]) -> None: + """Test VerifyBFDPeersRegProtocols.Input valid inputs.""" + VerifyBFDPeersRegProtocols.Input(bfd_peers=bfd_peers) + + @pytest.mark.parametrize( + ("bfd_peers"), + [ + pytest.param([{"peer_address": "10.0.0.1", "vrf": "default"}], id="invalid"), + ], + ) + def test_invalid(self, bfd_peers: list[BFDPeer]) -> None: + """Test VerifyBFDPeersRegProtocols.Input invalid inputs.""" + with pytest.raises(ValidationError): + VerifyBFDPeersRegProtocols.Input(bfd_peers=bfd_peers) From 56acfbceca41658960edf191aed11efe61134ae8 Mon Sep 17 00:00:00 2001 From: vitthalmagadum <122079046+vitthalmagadum@users.noreply.github.com> Date: Tue, 14 Jan 2025 21:15:45 +0530 Subject: [PATCH 8/8] feat(anta.tests): Added tests parallel of VerifyBGPSpecificPeers or VerifyBGPPeersHealth tailored for RIBD (#1003) * Added TC for VerifyBGPSpecificPeers or VerifyBGPPeersHealth tailored for RIBD * fixing the pipeline failures * Addressed review comments: updated docstring and info message for ribd tests * Update BGP info message --------- Co-authored-by: Carl Baillargeon --- anta/tests/routing/bgp.py | 142 ++++++++ docs/api/tests.routing.bgp.md | 9 +- examples/tests.yaml | 15 + tests/units/anta_tests/routing/test_bgp.py | 372 +++++++++++++++++++++ 4 files changed, 536 insertions(+), 2 deletions(-) diff --git a/anta/tests/routing/bgp.py b/anta/tests/routing/bgp.py index a2863c6e0..e17974226 100644 --- a/anta/tests/routing/bgp.py +++ b/anta/tests/routing/bgp.py @@ -1333,3 +1333,145 @@ def test(self) -> None: # Verify warning limit if provided. By default, EOS does not have a warning limit and `totalRoutesWarnLimit` is not present in the output. if warning_limit is not None and (actual_warning_limit := peer_data.get("totalRoutesWarnLimit", 0)) != warning_limit: self.result.is_failure(f"{peer} - Maximum routes warning limit mismatch - Expected: {warning_limit}, Actual: {actual_warning_limit}") + + +class VerifyBGPPeerSessionRibd(AntaTest): + """Verifies the session state of BGP IPv4 peer(s). + + Compatible with EOS operating in `ribd` routing protocol model. + + This test performs the following checks for each specified peer: + + 1. Verifies that the peer is found in its VRF in the BGP configuration. + 2. Checks that the BGP session is in the `Established` state. + 3. Ensures that both input and output TCP message queues are empty. + Can be disabled by setting `check_tcp_queues` global flag to `False`. + + Expected Results + ---------------- + * Success: If all of the following conditions are met: + - All specified peers are found in the BGP configuration. + - All peers sessions state are `Established`. + - All peers have empty TCP message queues if `check_tcp_queues` is `True` (default). + * Failure: If any of the following occur: + - A specified peer is not found in the BGP configuration. + - A peer's session state is not `Established`. + - A peer has non-empty TCP message queues (input or output) when `check_tcp_queues` is `True`. + + Examples + -------- + ```yaml + anta.tests.routing: + bgp: + - VerifyBGPPeerSessionRibd: + check_tcp_queues: false + bgp_peers: + - peer_address: 10.1.0.1 + vrf: default + - peer_address: 10.1.0.2 + vrf: default + - peer_address: 10.1.255.2 + vrf: DEV + - peer_address: 10.1.255.4 + vrf: DEV + ``` + """ + + categories: ClassVar[list[str]] = ["bgp"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ip bgp neighbors vrf all", revision=2)] + + class Input(AntaTest.Input): + """Input model for the VerifyBGPPeerSessionRibd test.""" + + check_tcp_queues: bool = True + """Flag to check if the TCP session queues are empty for all BGP peers. Defaults to `True`.""" + bgp_peers: list[BgpPeer] + """List of BGP IPv4 peers.""" + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyBGPPeerSessionRibd.""" + self.result.is_success() + + output = self.instance_commands[0].json_output + + for peer in self.inputs.bgp_peers: + peer_address = str(peer.peer_address) + peers = get_value(output, f"vrfs.{peer.vrf}.peerList", default=[]) + + # Check if the peer is found + if (peer_data := get_item(peers, "peerAddress", peer_address)) is None: + self.result.is_failure(f"{peer} - Not found") + continue + + # Check if the BGP session is established + if peer_data["state"] != "Established": + self.result.is_failure(f"{peer} - Session state is not established - State: {peer_data['state']}") + continue + + # Check the TCP session message queues + if self.inputs.check_tcp_queues: + inq_stat = peer_data["peerTcpInfo"]["inputQueueLength"] + outq_stat = peer_data["peerTcpInfo"]["outputQueueLength"] + if inq_stat != 0 or outq_stat != 0: + self.result.is_failure(f"{peer} - Session has non-empty message queues - InQ: {inq_stat}, OutQ: {outq_stat}") + + +class VerifyBGPPeersHealthRibd(AntaTest): + """Verifies the health of all the BGP IPv4 peer(s). + + Compatible with EOS operating in `ribd` routing protocol model. + + This test performs the following checks for all BGP IPv4 peers: + + 1. Verifies that the BGP session is in the `Established` state. + 2. Checks that both input and output TCP message queues are empty. + Can be disabled by setting `check_tcp_queues` global flag to `False`. + + Expected Results + ---------------- + * Success: If all checks pass for all BGP IPv4 peers. + * Failure: If any of the following occur: + - Any BGP session is not in the `Established` state. + - Any TCP message queue (input or output) is not empty when `check_tcp_queues` is `True` (default). + + Examples + -------- + ```yaml + anta.tests.routing: + bgp: + - VerifyBGPPeersHealthRibd: + check_tcp_queues: True + ``` + """ + + categories: ClassVar[list[str]] = ["bgp"] + commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ip bgp neighbors vrf all", revision=2)] + + class Input(AntaTest.Input): + """Input model for the VerifyBGPPeersHealthRibd test.""" + + check_tcp_queues: bool = True + """Flag to check if the TCP session queues are empty for all BGP peers. Defaults to `True`.""" + + @AntaTest.anta_test + def test(self) -> None: + """Main test function for VerifyBGPPeersHealthRibd.""" + self.result.is_success() + + output = self.instance_commands[0].json_output + + for vrf, vrf_data in output["vrfs"].items(): + peer_list = vrf_data.get("peerList", []) + + for peer in peer_list: + # Check if the BGP session is established + if peer["state"] != "Established": + self.result.is_failure(f"Peer: {peer['peerAddress']} VRF: {vrf} - Session state is not established - State: {peer['state']}") + continue + + # Check the TCP session message queues + inq = peer["peerTcpInfo"]["inputQueueLength"] + outq = peer["peerTcpInfo"]["outputQueueLength"] + if self.inputs.check_tcp_queues and (inq != 0 or outq != 0): + self.result.is_failure(f"Peer: {peer['peerAddress']} VRF: {vrf} - Session has non-empty message queues - InQ: {inq}, OutQ: {outq}") diff --git a/docs/api/tests.routing.bgp.md b/docs/api/tests.routing.bgp.md index f187c163d..1e0cbec8d 100644 --- a/docs/api/tests.routing.bgp.md +++ b/docs/api/tests.routing.bgp.md @@ -7,8 +7,13 @@ anta_title: ANTA catalog for BGP tests ~ that can be found in the LICENSE file. --> -!!! info "`multi-agent` Service Routing Protocols Model Requirements" - The BGP tests in this section are only supported on switches running the `multi-agent` routing protocols model. Starting from EOS version 4.30.1F, `service routing protocols model` is set to `multi-agent` by default. These BGP commands may **not** be compatible with switches running the legacy `ribd` routing protocols model and may fail if attempted. +!!! info "BGP Test Compatibility Note" + ANTA BGP tests are designed for the `multi-agent` routing protocol model. Starting from EOS 4.30.1F, `service routing protocols models` is set to `multi-agent` by default, and from EOS 4.32.0F it becomes the only supported model. + + The following tests are available for devices using the legacy `ribd` model on earlier EOS versions: + + - `VerifyBGPPeerSessionRibd` + - `VerifyBGPPeersHealthRibd` # Tests diff --git a/examples/tests.yaml b/examples/tests.yaml index 387d3bb0a..790e2878f 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -460,6 +460,18 @@ anta.tests.routing.bgp: vrf: DEV - peer_address: 10.1.255.4 vrf: DEV + - VerifyBGPPeerSessionRibd: + # Verifies the session state of BGP IPv4 peer(s). + check_tcp_queues: false + bgp_peers: + - peer_address: 10.1.0.1 + vrf: default + - peer_address: 10.1.0.2 + vrf: default + - peer_address: 10.1.255.2 + vrf: DEV + - peer_address: 10.1.255.4 + vrf: DEV - VerifyBGPPeerUpdateErrors: # Verifies BGP update error counters for the provided BGP IPv4 peer(s). bgp_peers: @@ -478,6 +490,9 @@ anta.tests.routing.bgp: safi: "unicast" vrf: "DEV" check_tcp_queues: false + - VerifyBGPPeersHealthRibd: + # Verifies the health of all the BGP IPv4 peer(s). + check_tcp_queues: True - VerifyBGPSpecificPeers: # Verifies the health of specific BGP peer(s) for given address families. address_families: diff --git a/tests/units/anta_tests/routing/test_bgp.py b/tests/units/anta_tests/routing/test_bgp.py index 4d9e3c026..86e199e0e 100644 --- a/tests/units/anta_tests/routing/test_bgp.py +++ b/tests/units/anta_tests/routing/test_bgp.py @@ -22,7 +22,9 @@ VerifyBGPPeerRouteLimit, VerifyBGPPeerRouteRefreshCap, VerifyBGPPeerSession, + VerifyBGPPeerSessionRibd, VerifyBGPPeersHealth, + VerifyBGPPeersHealthRibd, VerifyBGPPeerUpdateErrors, VerifyBgpRouteMaps, VerifyBGPSpecificPeers, @@ -4048,4 +4050,374 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo ], }, }, + { + "name": "success-no-check-tcp-queues", + "test": VerifyBGPPeerSessionRibd, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 10, + "inputQueueLength": 5, + }, + } + ] + }, + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 10, + "inputQueueLength": 5, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "check_tcp_queues": False, + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default"}, + {"peer_address": "10.100.0.9", "vrf": "MGMT"}, + ], + }, + "expected": {"result": "success"}, + }, + { + "name": "success-check-tcp-queues", + "test": VerifyBGPPeerSessionRibd, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 0, + "inputQueueLength": 0, + }, + } + ] + }, + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 0, + "inputQueueLength": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "check_tcp_queues": True, + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default"}, + {"peer_address": "10.100.0.9", "vrf": "MGMT"}, + ], + }, + "expected": {"result": "success"}, + }, + { + "name": "failure-peer-not-found", + "test": VerifyBGPPeerSessionRibd, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 0, + "inputQueueLength": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default"}, + {"peer_address": "10.100.0.9", "vrf": "MGMT"}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "Peer: 10.100.0.9 VRF: MGMT - Not found", + ], + }, + }, + { + "name": "failure-not-established", + "test": VerifyBGPPeerSessionRibd, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "state": "Active", + "peerTcpInfo": { + "outputQueueLength": 0, + "inputQueueLength": 0, + }, + } + ] + }, + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "state": "Active", + "peerTcpInfo": { + "outputQueueLength": 0, + "inputQueueLength": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default"}, + {"peer_address": "10.100.0.9", "vrf": "MGMT"}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "Peer: 10.100.0.8 VRF: default - Session state is not established - State: Active", + "Peer: 10.100.0.9 VRF: MGMT - Session state is not established - State: Active", + ], + }, + }, + { + "name": "failure-check-tcp-queues", + "test": VerifyBGPPeerSessionRibd, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 10, + "inputQueueLength": 5, + }, + } + ] + }, + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 0, + "inputQueueLength": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "bgp_peers": [ + {"peer_address": "10.100.0.8", "vrf": "default"}, + {"peer_address": "10.100.0.9", "vrf": "MGMT"}, + ] + }, + "expected": { + "result": "failure", + "messages": [ + "Peer: 10.100.0.8 VRF: default - Session has non-empty message queues - InQ: 5, OutQ: 10", + ], + }, + }, + { + "name": "success-no-check-tcp-queues", + "test": VerifyBGPPeersHealthRibd, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 10, + "inputQueueLength": 5, + }, + } + ] + }, + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 10, + "inputQueueLength": 5, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "check_tcp_queues": False, + }, + "expected": {"result": "success"}, + }, + { + "name": "success-check-tcp-queues", + "test": VerifyBGPPeersHealthRibd, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 0, + "inputQueueLength": 0, + }, + } + ] + }, + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 0, + "inputQueueLength": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": { + "check_tcp_queues": True, + }, + "expected": {"result": "success"}, + }, + { + "name": "failure-not-established", + "test": VerifyBGPPeersHealthRibd, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "state": "Active", + "peerTcpInfo": { + "outputQueueLength": 0, + "inputQueueLength": 0, + }, + } + ] + }, + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "state": "Active", + "peerTcpInfo": { + "outputQueueLength": 0, + "inputQueueLength": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": {}, + "expected": { + "result": "failure", + "messages": [ + "Peer: 10.100.0.8 VRF: default - Session state is not established - State: Active", + "Peer: 10.100.0.9 VRF: MGMT - Session state is not established - State: Active", + ], + }, + }, + { + "name": "failure-check-tcp-queues", + "test": VerifyBGPPeersHealthRibd, + "eos_data": [ + { + "vrfs": { + "default": { + "peerList": [ + { + "peerAddress": "10.100.0.8", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 10, + "inputQueueLength": 5, + }, + } + ] + }, + "MGMT": { + "peerList": [ + { + "peerAddress": "10.100.0.9", + "state": "Established", + "peerTcpInfo": { + "outputQueueLength": 0, + "inputQueueLength": 0, + }, + } + ] + }, + }, + }, + ], + "inputs": {}, + "expected": { + "result": "failure", + "messages": [ + "Peer: 10.100.0.8 VRF: default - Session has non-empty message queues - InQ: 5, OutQ: 10", + ], + }, + }, ]