Skip to content

Commit

Permalink
fix: improve error handling and ansible group parsing of anta get fro…
Browse files Browse the repository at this point in the history
…m-ansible (#449)

---------

Co-authored-by: gmuloc <gmulocher@arista.com>
  • Loading branch information
mtache and gmuloc authored Nov 10, 2023
1 parent a146f80 commit b982f55
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 37 deletions.
21 changes: 13 additions & 8 deletions anta/cli/get/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from rich.pretty import pretty_repr

from anta.cli.console import console
from anta.cli.utils import parse_tags
from anta.cli.utils import ExitCode, parse_tags

from .utils import create_inventory_from_ansible, create_inventory_from_cvp, get_cv_token

Expand Down Expand Up @@ -74,7 +74,8 @@ def from_cvp(inventory_directory: str, cvp_ip: str, cvp_username: str, cvp_passw


@click.command(no_args_is_help=True)
@click.option("--ansible-group", "-g", help="Ansible group to filter", type=str, required=False)
@click.pass_context
@click.option("--ansible-group", "-g", help="Ansible group to filter", type=str, required=False, default="all")
@click.option(
"--ansible-inventory",
"-i",
Expand All @@ -89,17 +90,21 @@ def from_cvp(inventory_directory: str, cvp_ip: str, cvp_username: str, cvp_passw
help="Path to save inventory file",
type=click.Path(file_okay=True, dir_okay=False, exists=False, writable=True, path_type=Path),
)
def from_ansible(output: Path, ansible_inventory: Path, ansible_group: str) -> None:
def from_ansible(ctx: click.Context, output: Path, ansible_inventory: Path, ansible_group: str) -> None:
"""Build ANTA inventory from an ansible inventory YAML file"""
logger.info(f"Building inventory from ansible file {ansible_inventory}")

# Create output directory
output.parent.mkdir(parents=True, exist_ok=True)
create_inventory_from_ansible(
inventory=ansible_inventory,
output_file=output,
ansible_root=ansible_group,
)
try:
create_inventory_from_ansible(
inventory=ansible_inventory,
output_file=output,
ansible_group=ansible_group,
)
except ValueError as e:
logger.error(str(e))
ctx.exit(ExitCode.USAGE_ERROR)


@click.command()
Expand Down
67 changes: 40 additions & 27 deletions anta/cli/get/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright (c) 2023 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.

"""
Utils functions to use with anta.cli.get.commands module.
"""
Expand All @@ -10,13 +9,14 @@
import json
import logging
from pathlib import Path
from typing import Any, Union
from typing import Any

import requests
import urllib3
import yaml

from ...inventory import AntaInventory
from anta.inventory import AntaInventory
from anta.inventory.models import AntaInventoryHost, AntaInventoryInput

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Expand All @@ -25,6 +25,7 @@

def get_cv_token(cvp_ip: str, cvp_username: str, cvp_password: str) -> str:
"""Generate AUTH token from CVP using password"""
# TODO, need to handle requests eror

# use CVP REST API to generate a token
URL = f"https://{cvp_ip}/cvpservice/login/authenticate.do"
Expand All @@ -35,7 +36,7 @@ def get_cv_token(cvp_ip: str, cvp_username: str, cvp_password: str) -> str:
return response.json()["sessionId"]


def create_inventory_from_cvp(inv: list[dict[str, Any]], directory: str, container: str) -> None:
def create_inventory_from_cvp(inv: list[dict[str, Any]], directory: str, container: str | None = None) -> None:
"""
create an inventory file from Arista CloudVision
"""
Expand All @@ -52,44 +53,56 @@ def create_inventory_from_cvp(inv: list[dict[str, Any]], directory: str, contain
logger.info(f"Inventory file has been created in {out_file}")


def create_inventory_from_ansible(inventory: Path, output_file: Path, ansible_root: Union[str, None] = None) -> None:
def create_inventory_from_ansible(inventory: Path, output_file: Path, ansible_group: str = "all") -> None:
"""
Create an ANTA inventory from an Ansible inventory YAML file
Args:
inventory (str): Ansible Inventory file to read
output_file (str, optional): ANTA inventory file to generate.
ansible_root (Union[str, None], optional): Ansible group from where to extract data. Defaults to None.
inventory: Ansible Inventory file to read
output_file: ANTA inventory file to generate.
ansible_root: Ansible group from where to extract data.
"""

def deep_yaml_parsing(data: dict[str, Any], hosts: Union[None, list[dict[str, str]]] = None) -> Union[None, list[dict[str, str]]]:
def find_ansible_group(data: dict[str, Any], group: str) -> dict[str, Any] | None:
for k, v in data.items():
if isinstance(v, dict):
if k == group and ("children" in v.keys() or "hosts" in v.keys()):
return v
d = find_ansible_group(v, group)
if d is not None:
return d
return None

def deep_yaml_parsing(data: dict[str, Any], hosts: list[AntaInventoryHost] | None = None) -> list[AntaInventoryHost]:
"""Deep parsing of YAML file to extract hosts and associated IPs"""
if hosts is None:
hosts = []
for key, value in data.items():
if isinstance(value, dict) and "ansible_host" in value.keys():
hosts.append({"name": key, "host": value["ansible_host"]})
logger.info(f" * adding entry for {key}")
hosts.append(AntaInventoryHost(name=key, host=value["ansible_host"]))
elif isinstance(value, dict):
deep_yaml_parsing(value, hosts)
else:
return hosts
return hosts

i: dict[str, dict[str, Any]] = {AntaInventory.INVENTORY_ROOT_KEY: {"hosts": []}}
with open(inventory, encoding="utf-8") as inv:
ansible_inventory = yaml.safe_load(inv)
if ansible_root not in ansible_inventory.keys():
logger.error(f"Group {ansible_root} not in ansible inventory {inventory}")
raise ValueError(f"Group {ansible_root} not in ansible inventory {inventory}")
if ansible_root is None:
ansible_hosts = deep_yaml_parsing(ansible_inventory, hosts=[])
else:
ansible_hosts = deep_yaml_parsing(ansible_inventory[ansible_root], hosts=[])
if ansible_hosts is None:
ansible_hosts = []
for dev in ansible_hosts:
logger.info(f' * adding entry for {dev["name"]}')
i[AntaInventory.INVENTORY_ROOT_KEY]["hosts"].append({"host": dev["host"], "name": dev["name"]})
try:
with open(inventory, encoding="utf-8") as inv:
ansible_inventory = yaml.safe_load(inv)
except OSError as exc:
raise ValueError(f"Could not parse {inventory}.") from exc

if not ansible_inventory:
raise ValueError(f"Ansible inventory {inventory} is empty")

ansible_inventory = find_ansible_group(ansible_inventory, ansible_group)

if ansible_inventory is None:
raise ValueError(f"Group {ansible_group} not found in Ansible inventory")
ansible_hosts = deep_yaml_parsing(ansible_inventory)
i = AntaInventoryInput(hosts=ansible_hosts)
# TODO, catch issue
with open(output_file, "w", encoding="UTF-8") as out_fd:
out_fd.write(yaml.dump(i))
logger.info(f"Inventory file has been created in {output_file}")
out_fd.write(yaml.dump({AntaInventory.INVENTORY_ROOT_KEY: i.model_dump(exclude_unset=True)}))
logger.info(f"ANTA device inventory file has been created in {output_file}")
47 changes: 47 additions & 0 deletions tests/data/ansible_inventory.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
---
all:
children:
cv_servers:
hosts:
cv_atd1:
ansible_host: 10.73.1.238
ansible_user: tom
ansible_password: arista123
cv_collection: v3
ATD_LAB:
vars:
ansible_user: arista
ansible_ssh_pass: arista
children:
ATD_FABRIC:
children:
ATD_SPINES:
vars:
type: spine
hosts:
spine1:
ansible_host: 192.168.0.10
spine2:
ansible_host: 192.168.0.11
ATD_LEAFS:
vars:
type: l3leaf
children:
pod1:
hosts:
leaf1:
ansible_host: 192.168.0.12
leaf2:
ansible_host: 192.168.0.13
pod2:
hosts:
leaf3:
ansible_host: 192.168.0.14
leaf4:
ansible_host: 192.168.0.15
ATD_TENANTS_NETWORKS:
children:
ATD_LEAFS:
ATD_SERVERS:
children:
ATD_LEAFS:
1 change: 1 addition & 0 deletions tests/data/empty_ansible_inventory.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
---
62 changes: 60 additions & 2 deletions tests/units/cli/get/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
"""
Tests for anta.cli.get.commands
"""

from __future__ import annotations

import os
from pathlib import Path
from typing import TYPE_CHECKING, cast
from unittest.mock import ANY, patch
Expand All @@ -16,13 +16,15 @@
from cvprac.cvp_client_errors import CvpApiError

from anta.cli import anta
from anta.cli.get.commands import from_cvp
from anta.cli.get.commands import from_ansible, from_cvp
from tests.lib.utils import default_anta_env

if TYPE_CHECKING:
from click.testing import CliRunner
from pytest import CaptureFixture, LogCaptureFixture

DATA_DIR: Path = Path(__file__).parents[3].resolve() / "data"


# Not testing for required parameter, click does this well.
@pytest.mark.parametrize(
Expand Down Expand Up @@ -95,3 +97,59 @@ def mock_cvp_connect(self: CvpClient, *args: str, **kwargs: str) -> None:
else:
assert "Error connecting to cvp" in caplog.text
assert result.exit_code == 1


@pytest.mark.parametrize(
"ansible_inventory, ansible_group, output, expected_exit",
[
pytest.param("ansible_inventory.yml", None, None, 0, id="no group"),
pytest.param("ansible_inventory.yml", "ATD_LEAFS", None, 0, id="group found"),
pytest.param("ansible_inventory.yml", "DUMMY", None, 4, id="group not found"),
pytest.param("empty_ansible_inventory.yml", None, None, 4, id="empty inventory"),
],
)
# pylint: disable-next=too-many-arguments
def test_from_ansible(
tmp_path: Path,
caplog: LogCaptureFixture,
capsys: CaptureFixture[str],
click_runner: CliRunner,
ansible_inventory: Path,
ansible_group: str | None,
output: Path | None,
expected_exit: int,
) -> None:
"""
Test `anta get from-ansible`
"""
env = default_anta_env()
cli_args = ["get", "from-ansible"]

os.chdir(tmp_path)
if output is not None:
cli_args.extend(["--output", str(output)])
out_dir = Path() / output
else:
# Get inventory-directory default
default_dir: Path = cast(Path, from_ansible.params[2].default)
out_dir = Path() / default_dir

if ansible_inventory is not None:
ansible_inventory_path = DATA_DIR / ansible_inventory
cli_args.extend(["--ansible-inventory", str(ansible_inventory_path)])

if ansible_group is not None:
cli_args.extend(["--ansible-group", ansible_group])

with capsys.disabled():
print(cli_args)
result = click_runner.invoke(anta, cli_args, env=env, auto_envvar_prefix="ANTA")

print(result)

assert result.exit_code == expected_exit
print(caplog.records)
if expected_exit != 0:
assert len(caplog.records) == 2
else:
assert out_dir.exists()
Loading

0 comments on commit b982f55

Please sign in to comment.