Skip to content

Commit

Permalink
add support for alternative namespace providers
Browse files Browse the repository at this point in the history
instead of using the namespace endpoint, the API can use any
arbitrary resource list endpoint and extract the available namespaces
from their metadata.

related to Jaydee94#125
  • Loading branch information
UiP9AV6Y committed Mar 4, 2023
1 parent 2ef8978 commit 00dcc0f
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 115 deletions.
34 changes: 34 additions & 0 deletions api/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
# API of kubeseal-webgui

This backend is used to encrypt secrets with the kubeseal binary.

## External dependencies

The application acts mostly as data broker. The actual information
comes from external service such as the Kubernetes API or by
invoking executables.

### `kubeseal`

Secret management is done via `kubeseal`. The application must be
able to invoke this binary. The lookup location can be customized
via the *KUBESEAL_BINARY* environment variable.

### Kubernetes API

Additional information, such as the available namespaces, is fetched
from the Kubernetes API. Currently the application requires to be
ran inside a Kubernetes cluster itself, as it connects to the interal
API for data retrieval.

Namespaces are fetched via the namespace [list endpoint][].
If the cluster role in use is not allowed to use this endpoint,
the application can also determine the list of available namespaces
by fetching a list of alternative resources and parse their metadata
for the required information. Internally it uses the
`list_K8S_NAMESPACE_RESOURCE_for_all_namespaces` methods from the
Kubernetes Python Client [core API][] object, where *K8S_NAMESPACE_RESOURCE*
is replaced with the value from the environment variable of the same name.
The resulting list is limited to all namespaces which are accessible and
have at least one instance of the given resource type.

[list endpoint]: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#list-namespace-v1-core
[core API]: https://github.com/kubernetes-client/python/blob/master/kubernetes/README.md#documentation-for-api-endpoints

1 change: 1 addition & 0 deletions api/kubeseal_webgui_api/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class AppSettings(BaseSettings):
kubeseal_version: str
kubeseal_binary: str = binary
kubeseal_cert: str = environ.get("KUBESEAL_CERT", "/dev/null")
k8s_namespace_resource: str = environ.get("K8S_NAMESPACE_RESOURCE", "namespace")
mock_enabled: bool = mock
mock_namespace_count: int = 120

Expand Down
8 changes: 8 additions & 0 deletions api/kubeseal_webgui_api/internal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# flake8: noqa

from .incluster_core_client import InclusterCoreClient
from .kubernetes_namespace_resolver import kubernetes_namespaces_resolver
from .kubernetes_resource_namespace_resolver import (
kubernetes_resource_namespaces_resolver,
)
from .mock_core_client import MockCoreClient
28 changes: 28 additions & 0 deletions api/kubeseal_webgui_api/internal/incluster_core_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from kubernetes import client, config


class InclusterCoreClient:
"""Proxy implementation for a CoreV1Api object."""

def __init__(self, core=None):
self.__core = core

if core is None:
self.__core = client.CoreV1Api()

def scope(self) -> str:
return "in-cluster"

def __getattr__(self, attr: str):
"""Proxy factory to load the API config before calling the target attribute"""

def wrapped_method(*args, **kwargs):
target = getattr(self.__core, attr)
if callable(target) and not attr.startswith(
"_"
): # avoid loading config for magic methods
config.load_incluster_config()
return target(*args, **kwargs)
return target

return wrapped_method
28 changes: 28 additions & 0 deletions api/kubeseal_webgui_api/internal/kubernetes_namespace_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging

from kubernetes import client

LOGGER = logging.getLogger("kubeseal-webgui")


def kubernetes_namespaces_parser(resources: list) -> list[str]:
"""Extract the metadata name from the provided namespace list."""
namespaces_list = []

for ns in resources:
namespaces_list.append(ns.metadata.name)

LOGGER.debug("Namespaces list %s", namespaces_list)
return namespaces_list


def kubernetes_namespaces_resolver(core) -> list[str]:
"""Retrieve a list of namespaces from current kubernetes cluster."""
LOGGER.info("Resolving %s Namespaces", core.scope())
namespaces = core.list_namespace()

if isinstance(namespaces, client.V1NamespaceList) and namespaces.items:
return kubernetes_namespaces_parser(namespaces.items)

LOGGER.warning("No valid namespace list available via %s", namespaces)
return []
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import logging

from kubeseal_webgui_api.app_config import settings

LOGGER = logging.getLogger("kubeseal-webgui")


def kubernetes_resource_namespaces_parser(resources: list) -> list[str]:
"""Extract the metadata namespace from the provided resource list."""
namespaces_list = set()

for res in resources:
namespaces_list.add(res.metadata.namespace)

LOGGER.debug("Namespaces list %s", namespaces_list)
return list(namespaces_list)


def kubernetes_resource_provider(core, resource: str):
"""Dynamic list provider for Kubernetes Core API resources."""
provider = getattr(core, f"list_{resource}_for_all_namespaces", None)

if not callable(provider):
return None

return provider(watch=False)


def kubernetes_resource_namespaces_resolver(core) -> list[str]:
"""Retrieve a list of namespaces from objects in the current kubernetes cluster."""
LOGGER.info(
"Resolving %s Namespaces from %s resources",
core.scope(),
settings.k8s_namespace_resource,
)
resources = kubernetes_resource_provider(core, settings.k8s_namespace_resource)

if not resources:
LOGGER.warning(
"Kubernetes client does not provide access to %s resources",
settings.k8s_namespace_resource,
)
return []
elif not resources.items:
LOGGER.warning("No valid resource list available via %s", resources)
return []

return kubernetes_resource_namespaces_parser(resources.items)
138 changes: 138 additions & 0 deletions api/kubeseal_webgui_api/internal/mock_core_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import itertools
import random
from typing import List

from kubernetes import client


adjectives = [
"altered",
"angry",
"big",
"blinking",
"boring",
"broken",
"bubbling",
"calculating",
"cute",
"diffing",
"expensive",
"fresh",
"fierce",
"floating",
"generous",
"golden",
"green",
"growing",
"hidden",
"hideous",
"interesting",
"kubed",
"mumbling",
"rusty",
"singing",
"small",
"sniffing",
"squared",
"talking",
"trusty",
"wise",
"walking",
"zooming",
]
nouns = [
"ant",
"bike",
"bird",
"captain",
"cheese",
"clock",
"digit",
"gorilla",
"kraken",
"number",
"maven",
"monitor",
"moose",
"moon",
"mouse",
"news",
"newt",
"octopus",
"opossum",
"otter",
"paper",
"passenger",
"potato",
"ship",
"spaceship",
"spaghetti",
"spoon",
"store",
"tomcat",
"trombone",
"unicorn",
"vine",
"whale",
]


def generate_namespaces(count) -> List[str]:
return sorted(
{
"-".join(words)
for words in random.choices( # noqa: S311 no security needed here
list(itertools.product(adjectives, nouns)), k=count
)
}
)


class MockCoreClient:
def __init__(self, namespace_count: int = 200):
self.__namespace_count = namespace_count

def scope(self) -> str:
return "mock"

def create_v1_namespace(self, name: str):
"""Simple V1Namespace object factory"""
meta = client.V1ObjectMeta(name=name)

return client.V1Namespace(
api_version="v1",
kind="Namespace",
metadata=meta,
)

def create_v1_pod(self, name: str, namespace: str):
"""Simple V1Namespace object factory"""
meta = client.V1ObjectMeta(name=name, namespace=namespace)

return client.V1Pod(
api_version="v1",
kind="Pod",
metadata=meta,
)

def list_pod_for_all_namespaces(self, *args, **kwargs) -> List:
namespaces = generate_namespaces(self.__namespace_count)
items = []
for ns in namespaces:
items.append(self.create_v1_pod("mock", ns))

return client.V1PodList(
api_version="v1",
kind="PodList",
items=items,
)

def list_namespace(self, *args, **kwargs) -> List:
items = map(
self.create_v1_namespace, generate_namespaces(self.__namespace_count)
)
return client.V1NamespaceList(
api_version="v1",
kind="NamespaceList",
items=items,
)
15 changes: 11 additions & 4 deletions api/kubeseal_webgui_api/routers/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,31 @@
import fastapi

from kubeseal_webgui_api.app_config import settings
from kubeseal_webgui_api.routers.kubernetes_namespace_resolver import (
from kubeseal_webgui_api.internal import (
InclusterCoreClient,
MockCoreClient,
kubernetes_namespaces_resolver,
kubernetes_resource_namespaces_resolver,
)
from kubeseal_webgui_api.routers.mock_namespace_resolver import mock_namespaces_resolver

router = fastapi.APIRouter()
LOGGER = logging.getLogger("kubeseal-webgui")

if settings.mock_enabled:
namespace_resolver = mock_namespaces_resolver
core_client = MockCoreClient(namespace_count=settings.mock_namespace_count)
else:
core_client = InclusterCoreClient()

if settings.k8s_namespace_resource == "namespace":
namespace_resolver = kubernetes_namespaces_resolver
else:
namespace_resolver = kubernetes_resource_namespaces_resolver


@router.get("/namespaces", response_model=List[str])
def get_namespaces() -> List[str]:
try:
return namespace_resolver()
return namespace_resolver(core_client)
except RuntimeError:
raise fastapi.HTTPException(
status_code=500, detail="Can't get namespaces from cluster."
Expand Down
23 changes: 0 additions & 23 deletions api/kubeseal_webgui_api/routers/kubernetes_namespace_resolver.py

This file was deleted.

Loading

0 comments on commit 00dcc0f

Please sign in to comment.