-
-
Notifications
You must be signed in to change notification settings - Fork 132
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
18 changed files
with
507 additions
and
95 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,65 @@ | ||
from collections import OrderedDict | ||
|
||
from openapi_core.casting.schemas.casters import ArrayCaster | ||
from openapi_core.casting.schemas.casters import BooleanCaster | ||
from openapi_core.casting.schemas.casters import IntegerCaster | ||
from openapi_core.casting.schemas.casters import NumberCaster | ||
from openapi_core.casting.schemas.casters import ObjectCaster | ||
from openapi_core.casting.schemas.casters import PrimitiveCaster | ||
from openapi_core.casting.schemas.casters import TypesCaster | ||
from openapi_core.casting.schemas.factories import SchemaCastersFactory | ||
from openapi_core.validation.schemas import ( | ||
oas30_read_schema_validators_factory, | ||
) | ||
from openapi_core.validation.schemas import ( | ||
oas30_write_schema_validators_factory, | ||
) | ||
from openapi_core.validation.schemas import oas31_schema_validators_factory | ||
|
||
__all__ = [ | ||
"oas30_write_schema_casters_factory", | ||
"oas30_read_schema_casters_factory", | ||
"oas31_schema_casters_factory", | ||
] | ||
|
||
oas30_casters_dict = OrderedDict( | ||
[ | ||
("object", ObjectCaster), | ||
("array", ArrayCaster), | ||
("boolean", BooleanCaster), | ||
("integer", IntegerCaster), | ||
("number", NumberCaster), | ||
("string", PrimitiveCaster), | ||
] | ||
) | ||
oas31_casters_dict = oas30_casters_dict.copy() | ||
oas31_casters_dict.update( | ||
{ | ||
"null": PrimitiveCaster, | ||
} | ||
) | ||
|
||
oas30_types_caster = TypesCaster( | ||
oas30_casters_dict, | ||
PrimitiveCaster, | ||
) | ||
oas31_types_caster = TypesCaster( | ||
oas31_casters_dict, | ||
PrimitiveCaster, | ||
multi=PrimitiveCaster, | ||
) | ||
|
||
oas30_write_schema_casters_factory = SchemaCastersFactory( | ||
oas30_write_schema_validators_factory, | ||
oas30_types_caster, | ||
) | ||
|
||
__all__ = ["schema_casters_factory"] | ||
oas30_read_schema_casters_factory = SchemaCastersFactory( | ||
oas30_read_schema_validators_factory, | ||
oas30_types_caster, | ||
) | ||
|
||
schema_casters_factory = SchemaCastersFactory() | ||
oas31_schema_casters_factory = SchemaCastersFactory( | ||
oas31_schema_validators_factory, | ||
oas31_types_caster, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,67 +1,238 @@ | ||
from typing import TYPE_CHECKING | ||
from typing import Any | ||
from typing import Callable | ||
from typing import Generic | ||
from typing import Iterable | ||
from typing import List | ||
from typing import Mapping | ||
from typing import Optional | ||
from typing import Type | ||
from typing import TypeVar | ||
from typing import Union | ||
|
||
from jsonschema_path import SchemaPath | ||
|
||
from openapi_core.casting.schemas.datatypes import CasterCallable | ||
from openapi_core.casting.schemas.exceptions import CastError | ||
from openapi_core.schema.schemas import get_properties | ||
from openapi_core.util import forcebool | ||
from openapi_core.validation.schemas.validators import SchemaValidator | ||
|
||
if TYPE_CHECKING: | ||
from openapi_core.casting.schemas.factories import SchemaCastersFactory | ||
|
||
|
||
class BaseSchemaCaster: | ||
def __init__(self, schema: SchemaPath): | ||
class PrimitiveCaster: | ||
def __init__( | ||
self, | ||
schema: SchemaPath, | ||
schema_validator: SchemaValidator, | ||
schema_caster: "SchemaCaster", | ||
): | ||
self.schema = schema | ||
self.schema_validator = schema_validator | ||
self.schema_caster = schema_caster | ||
|
||
def __call__(self, value: Any) -> Any: | ||
if value is None: | ||
return value | ||
return value | ||
|
||
return self.cast(value) | ||
|
||
def cast(self, value: Any) -> Any: | ||
raise NotImplementedError | ||
PrimitiveType = TypeVar("PrimitiveType") | ||
|
||
|
||
class CallableSchemaCaster(BaseSchemaCaster): | ||
def __init__(self, schema: SchemaPath, caster_callable: CasterCallable): | ||
super().__init__(schema) | ||
self.caster_callable = caster_callable | ||
class PrimitiveTypeCaster(Generic[PrimitiveType], PrimitiveCaster): | ||
primitive_type: Type[PrimitiveType] = NotImplemented | ||
|
||
def __call__(self, value: Union[str, bytes]) -> Any: | ||
self.validate(value) | ||
|
||
return self.primitive_type(value) # type: ignore [call-arg] | ||
|
||
def validate(self, value: Any) -> None: | ||
# FIXME: don't cast data from media type deserializer | ||
# See https://github.com/python-openapi/openapi-core/issues/706 | ||
# if not isinstance(value, (str, bytes)): | ||
# raise ValueError("should cast only from string or bytes") | ||
pass | ||
|
||
|
||
class IntegerCaster(PrimitiveTypeCaster[int]): | ||
primitive_type = int | ||
|
||
|
||
class NumberCaster(PrimitiveTypeCaster[float]): | ||
primitive_type = float | ||
|
||
|
||
class BooleanCaster(PrimitiveTypeCaster[bool]): | ||
primitive_type = bool | ||
|
||
def __call__(self, value: Union[str, bytes]) -> Any: | ||
self.validate(value) | ||
|
||
return self.primitive_type(forcebool(value)) | ||
|
||
def validate(self, value: Any) -> None: | ||
super().validate(value) | ||
|
||
# FIXME: don't cast data from media type deserializer | ||
# See https://github.com/python-openapi/openapi-core/issues/706 | ||
if isinstance(value, bool): | ||
return | ||
|
||
if value.lower() not in ["false", "true"]: | ||
raise ValueError("not a boolean format") | ||
|
||
|
||
class ArrayCaster(PrimitiveCaster): | ||
@property | ||
def items_caster(self) -> "SchemaCaster": | ||
# sometimes we don't have any schema i.e. free-form objects | ||
items_schema = self.schema.get("items", SchemaPath.from_dict({})) | ||
return self.schema_caster.evolve(items_schema) | ||
|
||
def __call__(self, value: Any) -> List[Any]: | ||
# str and bytes are not arrays according to the OpenAPI spec | ||
if isinstance(value, (str, bytes)) or not isinstance(value, Iterable): | ||
raise CastError(value, self.schema["type"]) | ||
|
||
def cast(self, value: Any) -> Any: | ||
try: | ||
return self.caster_callable(value) | ||
return list(map(self.items_caster.cast, value)) | ||
except (ValueError, TypeError): | ||
raise CastError(value, self.schema["type"]) | ||
|
||
|
||
class DummyCaster(BaseSchemaCaster): | ||
def cast(self, value: Any) -> Any: | ||
class ObjectCaster(PrimitiveCaster): | ||
def __call__(self, value: Any) -> Any: | ||
return self._cast_proparties(value) | ||
|
||
def evolve(self, schema: SchemaPath) -> "ObjectCaster": | ||
cls = self.__class__ | ||
|
||
return cls( | ||
schema, | ||
self.schema_validator.evolve(schema), | ||
self.schema_caster.evolve(schema), | ||
) | ||
|
||
def _cast_proparties(self, value: Any, schema_only: bool = False) -> Any: | ||
if not isinstance(value, dict): | ||
raise CastError(value, self.schema["type"]) | ||
|
||
all_of_schemas = self.schema_validator.iter_all_of_schemas(value) | ||
for all_of_schema in all_of_schemas: | ||
all_of_properties = self.evolve(all_of_schema)._cast_proparties( | ||
value, schema_only=True | ||
) | ||
value.update(all_of_properties) | ||
|
||
for prop_name, prop_schema in get_properties(self.schema).items(): | ||
try: | ||
prop_value = value[prop_name] | ||
except KeyError: | ||
continue | ||
value[prop_name] = self.schema_caster.evolve(prop_schema).cast( | ||
prop_value | ||
) | ||
|
||
if schema_only: | ||
return value | ||
|
||
additional_properties = self.schema.getkey( | ||
"additionalProperties", True | ||
) | ||
if additional_properties is not False: | ||
# free-form object | ||
if additional_properties is True: | ||
additional_prop_schema = SchemaPath.from_dict( | ||
{"nullable": True} | ||
) | ||
# defined schema | ||
else: | ||
additional_prop_schema = self.schema / "additionalProperties" | ||
additional_prop_caster = self.schema_caster.evolve( | ||
additional_prop_schema | ||
) | ||
for prop_name, prop_value in value.items(): | ||
if prop_name in value: | ||
continue | ||
value[prop_name] = additional_prop_caster.cast(prop_value) | ||
|
||
return value | ||
|
||
|
||
class ComplexCaster(BaseSchemaCaster): | ||
class TypesCaster: | ||
casters: Mapping[str, Type[PrimitiveCaster]] = {} | ||
multi: Optional[Type[PrimitiveCaster]] = None | ||
|
||
def __init__( | ||
self, schema: SchemaPath, casters_factory: "SchemaCastersFactory" | ||
self, | ||
casters: Mapping[str, Type[PrimitiveCaster]], | ||
default: Type[PrimitiveCaster], | ||
multi: Optional[Type[PrimitiveCaster]] = None, | ||
): | ||
super().__init__(schema) | ||
self.casters_factory = casters_factory | ||
self.casters = casters | ||
self.default = default | ||
self.multi = multi | ||
|
||
def get_caster( | ||
self, | ||
schema_type: Optional[Union[Iterable[str], str]], | ||
) -> Type["PrimitiveCaster"]: | ||
if schema_type is None: | ||
return self.default | ||
if isinstance(schema_type, Iterable) and not isinstance( | ||
schema_type, str | ||
): | ||
if self.multi is None: | ||
raise TypeError("caster does not accept multiple types") | ||
return self.multi | ||
|
||
return self.casters[schema_type] | ||
|
||
|
||
class SchemaCaster: | ||
def __init__( | ||
self, | ||
schema: SchemaPath, | ||
schema_validator: SchemaValidator, | ||
types_caster: TypesCaster, | ||
): | ||
self.schema = schema | ||
self.schema_validator = schema_validator | ||
|
||
self.types_caster = types_caster | ||
|
||
class ArrayCaster(ComplexCaster): | ||
@property | ||
def items_caster(self) -> BaseSchemaCaster: | ||
return self.casters_factory.create(self.schema / "items") | ||
def cast(self, value: Any) -> Any: | ||
# skip casting for nullable in OpenAPI 3.0 | ||
if value is None and self.schema.getkey("nullable", False): | ||
return value | ||
|
||
def cast(self, value: Any) -> List[Any]: | ||
# str and bytes are not arrays according to the OpenAPI spec | ||
if isinstance(value, (str, bytes)): | ||
raise CastError(value, self.schema["type"]) | ||
schema_type = self.schema.getkey("type") | ||
|
||
type_caster = self.get_type_caster(schema_type) | ||
|
||
if value is None: | ||
return value | ||
|
||
try: | ||
return list(map(self.items_caster, value)) | ||
return type_caster(value) | ||
except (ValueError, TypeError): | ||
raise CastError(value, self.schema["type"]) | ||
raise CastError(value, schema_type) | ||
|
||
def get_type_caster( | ||
self, | ||
schema_type: Optional[Union[Iterable[str], str]], | ||
) -> PrimitiveCaster: | ||
caster_cls = self.types_caster.get_caster(schema_type) | ||
return caster_cls( | ||
self.schema, | ||
self.schema_validator, | ||
self, | ||
) | ||
|
||
def evolve(self, schema: SchemaPath) -> "SchemaCaster": | ||
cls = self.__class__ | ||
|
||
return cls( | ||
schema, | ||
self.schema_validator.evolve(schema), | ||
self.types_caster, | ||
) |
Oops, something went wrong.