Skip to content

Commit

Permalink
Merge pull request #1027 from Aiven-Open/keejon/fix-delete-references
Browse files Browse the repository at this point in the history
fix: delete references if schema is deleted
  • Loading branch information
jjaakola-aiven authored Jan 23, 2025
2 parents 2da698f + 485a95c commit 6e61dfe
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 40 deletions.
28 changes: 15 additions & 13 deletions src/karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,10 @@ def num_subjects(self) -> int:
def num_schema_versions(self) -> tuple[int, int]:
pass

@abstractmethod
def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
pass

@abstractmethod
def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None:
pass

@abstractmethod
def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None:
pass


class InMemoryDatabase(KarapaceDatabase):
def __init__(self) -> None:
Expand Down Expand Up @@ -257,6 +249,9 @@ def insert_schema_version(
schema=schema,
schema_id=schema_id,
)
if references:
for ref in references:
self._insert_referenced_by(subject=ref.subject, version=ref.version, schema_id=schema_id)
else:
self._delete_from_schema_id_on_subject(
subject=subject,
Expand Down Expand Up @@ -352,12 +347,19 @@ def delete_subject(self, *, subject: Subject, version: Version) -> None:

def delete_subject_hard(self, *, subject: Subject) -> None:
with self.schema_lock_thread:
for schema in self.subjects[subject].schemas.values():
if schema.references:
self._remove_referenced_by(schema.schema_id, schema.references)
del self.subjects[subject]
self._delete_subject_from_schema_id_on_subject(subject=subject)

def delete_subject_schema(self, *, subject: Subject, version: Version) -> None:
with self.schema_lock_thread:
self.subjects[subject].schemas.pop(version, None)
schema = self.subjects[subject].schemas.pop(version, None)
if schema:
if schema.references:
self._remove_referenced_by(schema.schema_id, schema.references)
self._delete_from_schema_id_on_subject(subject=subject, schema=schema.schema)

def num_schemas(self) -> int:
return len(self.schemas)
Expand All @@ -377,19 +379,19 @@ def num_schema_versions(self) -> tuple[int, int]:
soft_deleted_versions += 1
return (live_versions, soft_deleted_versions)

def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
def _insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
with self.schema_lock_thread:
referents = self.referenced_by.get((subject, version), None)
if referents:
referents.append(schema_id)
referents.add(schema_id)
else:
self.referenced_by[(subject, version)] = Referents([schema_id])
self.referenced_by[(subject, version)] = Referents({schema_id})

def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None:
with self.schema_lock_thread:
return self.referenced_by.get((subject, version), None)

def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None:
def _remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None:
with self.schema_lock_thread:
for ref in references:
key = (ref.subject, ref.version)
Expand Down
13 changes: 1 addition & 12 deletions src/karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version
from karapace.typing import JsonObject, SchemaReaderStoppper, Subject, Version
from karapace.utils import json_decode, JSONDecodeError, shutdown
from threading import Event, Lock, Thread
from typing import Final
Expand Down Expand Up @@ -660,10 +660,6 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
references=resolved_references,
)

if resolved_references:
for ref in resolved_references:
self.database.insert_referenced_by(subject=ref.subject, version=ref.version, schema_id=schema_id)

def handle_msg(self, key: dict, value: dict | None) -> None:
if "keytype" in key:
try:
Expand All @@ -687,13 +683,6 @@ def handle_msg(self, key: dict, value: dict | None) -> None:
)
raise InvalidSchema("Message key doesn't contain the `keytype` attribute")

def remove_referenced_by(
self,
schema_id: SchemaId,
references: Sequence[Reference],
) -> None:
self.database.remove_referenced_by(schema_id, references)

def get_referenced_by(
self,
subject: Subject,
Expand Down
2 changes: 1 addition & 1 deletion src/karapace/schema_references.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version
from typing import cast, NewType, TypeVar

Referents = NewType("Referents", list[SchemaId])
Referents = NewType("Referents", set[SchemaId])

T = TypeVar("T")

Expand Down
4 changes: 0 additions & 4 deletions src/karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[
deleted=True,
references=schema_version.references,
)
if schema_version.references and len(schema_version.references) > 0:
self.schema_reader.remove_referenced_by(schema_version.schema_id, schema_version.references)
else:
try:
schema_versions_live = self.subject_get(subject, include_deleted=False)
Expand Down Expand Up @@ -225,8 +223,6 @@ async def subject_version_delete_local(self, subject: Subject, version: Version,
deleted=True,
references=schema_version.references,
)
if schema_version.references and len(schema_version.references) > 0:
self.schema_reader.remove_referenced_by(schema_version.schema_id, schema_version.references)
return resolved_version

def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[Version, SchemaVersion]:
Expand Down
5 changes: 3 additions & 2 deletions stubs/confluent_kafka/admin/_config.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ._resource import ResourceType
from enum import Enum
from typing import cast

class ConfigResource:
Type = ResourceType
Expand All @@ -12,7 +13,7 @@ class ConfigResource:
) -> None: ...

class ConfigSource(Enum):
UNKNOWN_CONFIG: int
DYNAMIC_TOPIC_CONFIG: int
UNKNOWN_CONFIG = cast(int, ...)
DYNAMIC_TOPIC_CONFIG = cast(int, ...)

class ConfigEntry: ...
3 changes: 2 additions & 1 deletion stubs/confluent_kafka/admin/_resource.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from typing import cast

class ResourceType(Enum):
TOPIC: int
TOPIC = cast(int, ...)
86 changes: 79 additions & 7 deletions tests/unit/test_in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@
from __future__ import annotations

from collections import defaultdict
from collections.abc import Iterable, Sequence
from collections.abc import Sequence
from confluent_kafka.cimpl import KafkaError
from karapace.config import DEFAULTS
from karapace.constants import DEFAULT_SCHEMA_TOPIC
from karapace.in_memory_database import InMemoryDatabase, KarapaceDatabase, Subject, SubjectData
from karapace.kafka.types import Timestamp
from karapace.key_format import KeyFormatter
from karapace.offset_watcher import OffsetWatcher
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_reader import KafkaSchemaReader
from karapace.schema_references import Reference, Referents
from karapace.schema_type import SchemaType
from karapace.typing import SchemaId, Version
from pathlib import Path
from typing import Final

import pytest

TEST_DATA_FOLDER: Final = Path("tests/unit/test_data/")


Expand Down Expand Up @@ -176,15 +180,9 @@ def num_subjects(self) -> int:
def num_schema_versions(self) -> tuple[int, int]:
return self.db.num_schema_versions()

def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
return self.db.insert_referenced_by(subject=subject, version=version, schema_id=schema_id)

def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None:
return self.db.get_referenced_by(subject=subject, version=version)

def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None:
return self.db.remove_referenced_by(schema_id=schema_id, references=references)

def duplicates(self) -> dict[SchemaId, list[tuple[Subject, TypedSchema]]]:
duplicate_data = defaultdict(list)
for schema_id, schemas in self._duplicates.items():
Expand Down Expand Up @@ -259,3 +257,77 @@ def test_can_ingest_schemas_from_log() -> None:
schema_id_to_duplicated_subjects = compute_schema_id_to_subjects(duplicates, database.subject_to_subject_data())
assert schema_id_to_duplicated_subjects == {}, "there shouldn't be any duplicated schemas"
assert duplicates == {}, "the schema database is broken. The id should be unique"


@pytest.fixture(name="db_with_schemas")
def fixture_in_memory_database_with_schemas() -> InMemoryDatabase:
db = InMemoryDatabase()
schema_str = "syntax = 'proto3'; message Test { string test = 1; }"

subject_a = Subject("subject_a")
schema_a = TypedSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=schema_str,
schema=ProtobufSchema(schema=schema_str),
)
db.insert_subject(subject=subject_a)
schema_id_a = db.get_schema_id(schema_a)
db.insert_schema_version(
subject=subject_a, schema_id=schema_id_a, version=Version(1), schema=schema_a, deleted=False, references=None
)
db.insert_schema_version(
subject=subject_a, schema_id=schema_id_a, version=Version(2), schema=schema_a, deleted=False, references=None
)

subject_b = Subject("subject_b")
references_b = [Reference(name="test", subject=subject_a, version=Version(1))]
schema_b = TypedSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=schema_str,
schema=ProtobufSchema(schema=schema_str),
references=references_b,
)
db.insert_subject(subject=subject_b)
schema_id_b = db.get_schema_id(schema_b)
db.insert_schema_version(
subject=subject_b,
schema_id=schema_id_b,
version=Version(1),
schema=schema_b,
deleted=False,
references=references_b,
)

return db


def test_delete_schema_references(db_with_schemas: InMemoryDatabase) -> None:
# Check that the schema is referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert referents is not None
version = db_with_schemas.find_schema_versions_by_schema_id(schema_id=referents.pop(), include_deleted=False)[0]
assert version.subject == Subject("subject_b")
assert version.version == Version(1)

# Delete the schema from subject_b
db_with_schemas.delete_subject_schema(subject=Subject("subject_b"), version=Version(1))

# Check that the schema is no longer referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert len(referents) == 0, "referents should be gone after deleting the schema"


def test_delete_subject(db_with_schemas: InMemoryDatabase) -> None:
# Check that the schema is referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert referents is not None
version = db_with_schemas.find_schema_versions_by_schema_id(schema_id=referents.pop(), include_deleted=False)[0]
assert version.subject == Subject("subject_b")
assert version.version == Version(1)

# Hard delete subject_b
db_with_schemas.delete_subject_hard(subject=Subject("subject_b"))

# Check that the schema is no longer referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert len(referents) == 0, "referents should be gone after hard deleting the subject"

0 comments on commit 6e61dfe

Please sign in to comment.