Skip to content

Commit

Permalink
✨ low-code CDK: Allow connector developers to specify the type of an …
Browse files Browse the repository at this point in the history
…added field (airbytehq#31638)

Co-authored-by: girarda <girarda@users.noreply.github.com>
Co-authored-by: erohmensing <erohmensing@gmail.com>
  • Loading branch information
3 people authored Oct 23, 2023
1 parent 8122e03 commit 7a764f8
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ definitions:
- "{{ record['updates'] }}"
- "{{ record['MetaData']['LastUpdatedTime'] }}"
- "{{ stream_partition['segment_id'] }}"
value_type:
title: Value Type
description: Type of the value. If not specified, the type will be inferred from the value.
"$ref": "#/definitions/ValueType"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -1987,6 +1991,15 @@ definitions:
$parameters:
type: object
additionalProperties: true
ValueType:
title: Value Type
description: A schema type.
type: string
enum:
- string
- number
- integer
- boolean
WaitTimeFromHeader:
title: Wait Time Extracted From Response Header
description: Extract wait time from a HTTP header in the response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,6 @@
from typing_extensions import Literal


class AddedFieldDefinition(BaseModel):
type: Literal['AddedFieldDefinition']
path: List[str] = Field(
...,
description='List of strings defining the path where to add the value on the record.',
examples=[['segment_id'], ['metadata', 'segment_id']],
title='Path',
)
value: str = Field(
...,
description="Value of the new field. Use {{ record['existing_field'] }} syntax to refer to other fields in the record.",
examples=[
"{{ record['updates'] }}",
"{{ record['MetaData']['LastUpdatedTime'] }}",
"{{ stream_partition['segment_id'] }}",
],
title='Value',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class AddFields(BaseModel):
type: Literal['AddFields']
fields: List[AddedFieldDefinition] = Field(
...,
description='List of transformations (path and corresponding value) that will be added to the record.',
title='Fields',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class AuthFlowType(Enum):
oauth2_0 = 'oauth2.0'
oauth1_0 = 'oauth1.0'
Expand Down Expand Up @@ -694,6 +663,13 @@ class LegacySessionTokenAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class ValueType(Enum):
string = 'string'
number = 'number'
integer = 'integer'
boolean = 'boolean'


class WaitTimeFromHeader(BaseModel):
type: Literal['WaitTimeFromHeader']
header: str = Field(
Expand Down Expand Up @@ -734,6 +710,42 @@ class WaitUntilTimeFromHeader(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class AddedFieldDefinition(BaseModel):
type: Literal['AddedFieldDefinition']
path: List[str] = Field(
...,
description='List of strings defining the path where to add the value on the record.',
examples=[['segment_id'], ['metadata', 'segment_id']],
title='Path',
)
value: str = Field(
...,
description="Value of the new field. Use {{ record['existing_field'] }} syntax to refer to other fields in the record.",
examples=[
"{{ record['updates'] }}",
"{{ record['MetaData']['LastUpdatedTime'] }}",
"{{ stream_partition['segment_id'] }}",
],
title='Value',
)
value_type: Optional[ValueType] = Field(
None,
description='Type of the value. If not specified, the type will be inferred from the value.',
title='Value Type',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class AddFields(BaseModel):
type: Literal['AddFields']
fields: List[AddedFieldDefinition] = Field(
...,
description='List of transformations (path and corresponding value) that will be added to the record.',
title='Fields',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class ApiKeyAuthenticator(BaseModel):
type: Literal['ApiKeyAuthenticator']
api_token: Optional[str] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SimpleRetriever as SimpleRetrieverModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SubstreamPartitionRouter as SubstreamPartitionRouterModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType
from airbyte_cdk.sources.declarative.models.declarative_component_schema import WaitTimeFromHeader as WaitTimeFromHeaderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import WaitUntilTimeFromHeader as WaitUntilTimeFromHeaderModel
from airbyte_cdk.sources.declarative.partition_routers import ListPartitionRouter, SinglePartitionRouter, SubstreamPartitionRouter
Expand Down Expand Up @@ -232,15 +233,36 @@ def _create_component_from_model(self, model: BaseModel, config: Config, **kwarg
@staticmethod
def create_added_field_definition(model: AddedFieldDefinitionModel, config: Config, **kwargs: Any) -> AddedFieldDefinition:
interpolated_value = InterpolatedString.create(model.value, parameters=model.parameters or {})
return AddedFieldDefinition(path=model.path, value=interpolated_value, parameters=model.parameters or {})
return AddedFieldDefinition(
path=model.path,
value=interpolated_value,
value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type),
parameters=model.parameters or {},
)

def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any) -> AddFields:
added_field_definitions = [
self._create_component_from_model(model=added_field_definition_model, config=config)
self._create_component_from_model(
model=added_field_definition_model,
value_type=ModelToComponentFactory._json_schema_type_name_to_type(added_field_definition_model.value_type),
config=config,
)
for added_field_definition_model in model.fields
]
return AddFields(fields=added_field_definitions, parameters=model.parameters or {})

@staticmethod
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
if not value_type:
return None
names_to_types = {
ValueType.string: str,
ValueType.number: float,
ValueType.integer: int,
ValueType.boolean: bool,
}
return names_to_types[value_type]

@staticmethod
def create_api_key_authenticator(
model: ApiKeyAuthenticatorModel, config: Config, token_provider: Optional[TokenProvider] = None, **kwargs: Any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional
from typing import Any, List, Optional

import requests
from airbyte_cdk.sources.declarative.types import Record


@dataclass
Expand All @@ -23,7 +24,7 @@ def initial_token(self) -> Optional[Any]:
"""

@abstractmethod
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]:
"""
:param response: response to process
:param last_records: records extracted from the response
Expand All @@ -32,7 +33,7 @@ def next_page_token(self, response: requests.Response, last_records: List[Mappin
pass

@abstractmethod
def reset(self):
def reset(self) -> None:
"""
Reset the pagination's inner state
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ def next_page_token(self, response: requests.Response, last_records: List[Record
return None
return self._delegate.next_page_token(response, last_records)

def reset(self):
def reset(self) -> None:
self._delegate.reset()

def get_page_size(self) -> Optional[int]:
return self._delegate.get_page_size()

@property
def initial_token(self) -> Optional[Any]:
return self._delegate.initial_token
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, Optional, Union
from typing import Any, List, Mapping, Optional, Type, Union

import dpath.util
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
Expand All @@ -17,6 +17,7 @@ class AddedFieldDefinition:

path: FieldPointer
value: Union[InterpolatedString, str]
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]


Expand All @@ -26,6 +27,7 @@ class ParsedAddFieldDefinition:

path: FieldPointer
value: InterpolatedString
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]


Expand Down Expand Up @@ -85,22 +87,27 @@ class AddFields(RecordTransformation):
parameters: InitVar[Mapping[str, Any]]
_parsed_fields: List[ParsedAddFieldDefinition] = field(init=False, repr=False, default_factory=list)

def __post_init__(self, parameters: Mapping[str, Any]):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
for add_field in self.fields:
if len(add_field.path) < 1:
raise f"Expected a non-zero-length path for the AddFields transformation {add_field}"
raise ValueError(f"Expected a non-zero-length path for the AddFields transformation {add_field}")

if not isinstance(add_field.value, InterpolatedString):
if not isinstance(add_field.value, str):
raise f"Expected a string value for the AddFields transformation: {add_field}"
else:
self._parsed_fields.append(
ParsedAddFieldDefinition(
add_field.path, InterpolatedString.create(add_field.value, parameters=parameters), parameters=parameters
add_field.path,
InterpolatedString.create(add_field.value, parameters=parameters),
value_type=add_field.value_type,
parameters=parameters,
)
)
else:
self._parsed_fields.append(ParsedAddFieldDefinition(add_field.path, add_field.value, parameters={}))
self._parsed_fields.append(
ParsedAddFieldDefinition(add_field.path, add_field.value, value_type=add_field.value_type, parameters={})
)

def transform(
self,
Expand All @@ -109,12 +116,15 @@ def transform(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> Record:
if config is None:
config = {}
kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
for parsed_field in self._parsed_fields:
value = parsed_field.value.eval(config, **kwargs)
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs)
dpath.util.new(record, parsed_field.path, value)

return record

def __eq__(self, other):
return self.__dict__ == other.__dict__
def __eq__(self, other: Any) -> bool:
return bool(self.__dict__ == other.__dict__)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataclasses import dataclass
from typing import Any, Mapping, Optional

from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState


@dataclass
Expand All @@ -18,7 +18,7 @@ class RecordTransformation:
@abstractmethod
def transform(
self,
record: Mapping[str, Any],
record: Record,
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ def test_get_value_from_config():
assert val == "2022-01-01"


@pytest.mark.parametrize(
"valid_types, expected_value",
[
pytest.param((str,), "1234J", id="test_value_is_a_string_if_valid_types_is_str"),
pytest.param(None, 1234j, id="test_value_is_interpreted_as_complex_number_by_default"),
],
)
def test_get_value_with_complex_number(valid_types, expected_value):
s = "{{ config['value'] }}"
config = {"value": "1234J"}
val = interpolation.eval(s, config, valid_types=valid_types)
assert val == expected_value


def test_get_value_from_stream_slice():
s = "{{ stream_slice['date'] }}"
config = {"date": "2022-01-01"}
Expand Down
Loading

0 comments on commit 7a764f8

Please sign in to comment.