Skip to content

Commit

Permalink
fix: integration tests uses subject name that has slash
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Jan 24, 2025
1 parent 7d70546 commit 2876794
Show file tree
Hide file tree
Showing 11 changed files with 687 additions and 742 deletions.
93 changes: 92 additions & 1 deletion src/karapace/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
See LICENSE for details
"""

from typing import Literal
from aiohttp import BasicAuth, ClientSession
from collections.abc import Awaitable, Callable, Mapping
from karapace.typing import JsonData
from urllib.parse import urljoin
from urllib.parse import urljoin, quote_plus

import logging
import ssl
Expand Down Expand Up @@ -115,6 +116,7 @@ async def delete(
path: Path,
headers: Headers | None = None,
auth: BasicAuth | None = None,
params: Mapping[str, str] | None = None,
) -> Result:
path = self.path_for(path)
if not headers:
Expand All @@ -125,6 +127,7 @@ async def delete(
headers=headers,
auth=auth,
ssl=self.ssl_mode,
params=params,
) as res:
json_result = {} if res.status == 204 else await res.json()
return Result(res.status, json_result, headers=res.headers)
Expand All @@ -135,6 +138,7 @@ async def post(
json: JsonData,
headers: Headers | None = None,
auth: BasicAuth | None = None,
params: Mapping[str, str] | None = None,
) -> Result:
path = self.path_for(path)
if not headers:
Expand All @@ -147,6 +151,7 @@ async def post(
auth=auth,
json=json,
ssl=self.ssl_mode,
params=params,
) as res:
json_result = {} if res.status == 204 else await res.json()
return Result(res.status, json_result, headers=res.headers)
Expand Down Expand Up @@ -191,3 +196,89 @@ async def put_with_data(
) as res:
json_result = await res.json()
return Result(res.status, json_result, headers=res.headers)

# Per resource functions
# COMPATIBILITY
async def post_compatibility_subject_version(
self, *, subject: str, version: int | Literal["latest"], json: JsonData
) -> Result:
return await self.post(
path=f"compatibility/subjects/{quote_plus(subject)}/versions/{version}",
json=json,
)

# CONFIG
async def get_config(self) -> Result:
return await self.get(path="/config")

async def put_config(self, *, json: JsonData) -> Result:
return await self.put(path="/config", json=json)

async def get_config_subject(self, *, subject: str, defaultToGlobal: bool | None = None) -> Result:
path = f"/config/{quote_plus(subject)}"
if defaultToGlobal is not None:
path = f"{path}?defaultToGlobal={str(defaultToGlobal).lower()}"
return await self.get(path=path)

async def put_config_subject(self, *, subject: str, json: JsonData) -> Result:
return await self.put(path=f"config/{quote_plus(subject)}", json=json)

async def delete_config_subject(self, *, subject: str) -> Result:
return await self.delete(path=f"config/{quote_plus(subject)}")

# MODE
async def get_mode(self) -> Result:
return await self.get(path="/mode")

async def get_mode_subject(self, *, subject: str) -> Result:
return await self.get(path=f"/mode/{quote_plus(subject)}")

# SCHEMAS
async def get_schemas(self) -> Result:
return await self.get("/schemas")

async def get_types(self) -> Result:
return await self.get(path="/schemas/types")

async def get_schema_by_id(self, *, schema_id: int, params: Mapping[str, str] | None = None) -> Result:
return await self.get(path=f"/schemas/ids/{schema_id}", params=params)

async def get_schema_by_id_versions(self, *, schema_id: int, params: Mapping[str, str] | None = None) -> Result:
return await self.get(path=f"/schemas/ids/{schema_id}/versions", params=params)

# SUBJECTS
async def get_subjects(self, *, params: Mapping[str, str] | None = None) -> Result:
return await self.get("/subjects", params=params)

async def get_subjects_versions(self, *, subject: str) -> Result:
return await self.get(f"subjects/{quote_plus(subject)}/versions")

async def post_subjects(self, *, subject: str, json: JsonData, params: Mapping[str, str] | None = None) -> Result:
return await self.post(f"/subjects/{quote_plus(subject)}", json=json, params=params)

async def post_subjects_versions(
self, *, subject: str, json: JsonData, params: Mapping[str, str] | None = None
) -> Result:
return await self.post(f"/subjects/{quote_plus(subject)}/versions", json=json, params=params)

async def get_subjects_subject_version(
self, *, subject: str, version: int | Literal["latest"], params: Mapping[str, str] | None = None
) -> Result:
return await self.get(f"/subjects/{quote_plus(subject)}/versions/{version}", params=params)

async def get_subjects_subject_version_schema(self, *, subject: str, version: int | Literal["latest"]) -> Result:
return await self.get(f"subjects/{quote_plus(subject)}/versions/{version}/schema")

async def get_subjects_subject_version_referenced_by(self, *, subject: str, version: int | Literal["latest"]) -> Result:
return await self.get(f"subjects/{quote_plus(subject)}/versions/{version}/referencedby")

async def delete_subjects(self, *, subject: str, params: Mapping[str, str] | None = None) -> Result:
return await self.delete(path=f"/subjects/{quote_plus(subject)}", params=params)

async def delete_subjects_version(
self, *, subject: str, version: int | Literal["latest"], permanent: bool | None = None
) -> Result:
path = f"subjects/{quote_plus(subject)}/versions/{version}"
if permanent is not None:
path = f"{path}?permanent={str(permanent).lower()}"
return await self.delete(path)
4 changes: 4 additions & 0 deletions src/schema_registry/routers/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
from schema_registry.container import SchemaRegistryContainer
from schema_registry.controller import KarapaceSchemaRegistryController
from schema_registry.routers.errors import unauthorized
from schema_registry.routers.raw_path_router import RawPathRoute
from schema_registry.routers.requests import CompatibilityCheckResponse, SchemaRequest
from schema_registry.user import get_current_user
from typing import Annotated
from urllib.parse import unquote_plus

compatibility_router = APIRouter(
prefix="/compatibility",
tags=["compatibility"],
responses={404: {"description": "Not found"}},
route_class=RawPathRoute,
)


Expand All @@ -31,6 +34,7 @@ async def compatibility_post(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityCheckResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand Down
4 changes: 4 additions & 0 deletions src/schema_registry/routers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from schema_registry.routers.requests import CompatibilityLevelResponse, CompatibilityRequest, CompatibilityResponse
from schema_registry.user import get_current_user
from typing import Annotated
from urllib.parse import unquote_plus


config_router = APIRouter(
Expand Down Expand Up @@ -72,6 +73,7 @@ async def config_get_subject(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityLevelResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -90,6 +92,7 @@ async def config_set_subject(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -114,6 +117,7 @@ async def config_delete_subject(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

Expand Down
2 changes: 2 additions & 0 deletions src/schema_registry/routers/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from schema_registry.routers.requests import ModeResponse
from schema_registry.user import get_current_user
from typing import Annotated
from urllib.parse import unquote_plus

mode_router = APIRouter(
prefix="/mode",
Expand Down Expand Up @@ -44,6 +45,7 @@ async def mode_get_subject(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> ModeResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand Down
12 changes: 9 additions & 3 deletions src/schema_registry/routers/subjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
from dependency_injector.wiring import inject, Provide
from fastapi import APIRouter, Depends, Request
from karapace.auth import AuthenticatorAndAuthorizer, Operation, User
from karapace.config import Config
from karapace.container import KarapaceContainer
from karapace.forward_client import ForwardClient
from karapace.typing import Subject
from schema_registry.container import SchemaRegistryContainer
Expand All @@ -18,6 +16,7 @@
from schema_registry.routers.requests import SchemaIdResponse, SchemaRequest, SchemaResponse, SubjectSchemaVersionResponse
from schema_registry.user import get_current_user
from typing import Annotated
from urllib.parse import unquote_plus

import logging

Expand Down Expand Up @@ -58,6 +57,7 @@ async def subjects_subject_post(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> SchemaResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -80,8 +80,8 @@ async def subjects_subject_delete(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> list[int]:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -107,6 +107,7 @@ async def subjects_subject_versions_post(
normalize: bool = False,
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> SchemaIdResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -130,6 +131,7 @@ async def subjects_subject_versions_list(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> list[int]:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -146,6 +148,7 @@ async def subjects_subject_version_get(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> SubjectSchemaVersionResponse:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -165,6 +168,7 @@ async def subjects_subject_version_delete(
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> int:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -187,6 +191,7 @@ async def subjects_subject_version_schema_get(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> dict:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand All @@ -202,6 +207,7 @@ async def subjects_subject_version_referenced_by(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> list[int]:
subject = Subject(unquote_plus(subject))
if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"):
raise unauthorized()

Expand Down
Loading

0 comments on commit 2876794

Please sign in to comment.