Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [FC-0074] add support for annotated python dicts as avro map type #433

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions openedx_events/event_bus/avro/deserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,27 @@ def _deserialized_avro_record_dict_to_object(data: dict, data_type, deserializer
elif data_type in PYTHON_TYPE_TO_AVRO_MAPPING:
return data
elif data_type_origin == list:
# returns types of list contents
# if data_type == List[int], arg_data_type = (int,)
# Returns types of list contents.
# Example: if data_type == List[int], arg_data_type = (int,)
arg_data_type = get_args(data_type)
if not arg_data_type:
raise TypeError(
"List without annotation type is not supported. The argument should be a type, for eg., List[int]"
)
# check whether list items type is in basic types.
# Check whether list items type is in basic types.
if arg_data_type[0] in SIMPLE_PYTHON_TYPE_TO_AVRO_MAPPING:
return data
elif data_type_origin == dict:
# Returns types of dict contents.
# Example: if data_type == Dict[str, int], arg_data_type = (str, int)
arg_data_type = get_args(data_type)
if not arg_data_type:
raise TypeError(
"Dict without annotation type is not supported. The argument should be a type, for eg., Dict[str, int]"
)
# Check whether dict items type is in basic types.
if all(arg in SIMPLE_PYTHON_TYPE_TO_AVRO_MAPPING for arg in arg_data_type):
return data
elif hasattr(data_type, "__attrs_attrs__"):
transformed = {}
for attribute in data_type.__attrs_attrs__:
Expand Down
21 changes: 18 additions & 3 deletions openedx_events/event_bus/avro/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ def _create_avro_field_definition(data_key, data_type, previously_seen_types,
field["type"] = field_type
# Case 2: data_type is a simple type that can be converted directly to an Avro type
elif data_type in PYTHON_TYPE_TO_AVRO_MAPPING:
if PYTHON_TYPE_TO_AVRO_MAPPING[data_type] in ["record", "array"]:
if PYTHON_TYPE_TO_AVRO_MAPPING[data_type] in ["map", "array"]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change, right? Were dicts only introduced and used with the forums events, which you plan to break anyway? You don't have a changelog entry or a major version change (yet), but is that your plan?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, dictionaries were never fully supported. Besides this list with known unserializable events, which contain dictionaries in their payloads, we can confirm this by generating a schema for an event using dictionaries. Let's take this event as an example:

# openedx_events/learning/signals.py@main
MY_EVENT = OpenEdxPublicSignal(
    event_type="org.openedx.learning.my_event.v1",
    data={
        "my_data": MyEventData,
    }
)

# openedx_events/learning/data.py@main
@attr.s(frozen=True)
class MyEventData:

    event_type = attr.ib(type=str)
    event_data = attr.ib(type=dict[str, str])

Which immediately raises a serialization error:

openedx-events/openedx_events/event_bus/avro/schema.py", line 107, in _create_avro_field_definition
    raise TypeError(
TypeError: Data type dict[str, str] is not supported. The data type needs to either be one of the types in PYTHON_TYPE_TO_AVRO_MAPPING, an attrs decorated class, or one of the types defined in custom_type_to_avro_type.

Because dicts were not a type supported in event_bus/avro/schema.py::_create_avro_field_definition before this PR. So no events with type dicts were ever sent through the event bus. Forum events were not supported either since they were listed in the known unserialiazable events, but I included them here as an example and will remove them shortly.

To maintain backward compatibility, I'll include the map type as an additional type instead of changing "record" to "map".

Copy link
Contributor

@robrap robrap Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mariajgrimaldi.

  1. It sounds like changing from "record" to "map" is not a breaking change for dicts if they aren't yet in use, and is really just a fix.
  2. Whether or not dropping "record" is a breaking change elsewhere is still a question. No tests broke when you had dropped it. Does that mean it isn't used, or that we are missing test coverage.
  3. Highly related, why do I not see the /schemas introduced in https://github.com/openedx/openedx-events/pull/225/files for unit testing? Is that part of the missing coverage?

UPDATE: If we are missing test coverage, it would be great to get that back. Or, if this is a non-breaking change, then you should drop "record" to clean up the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some tests to solve the missing coverage. However, I still wonder if we're breaking something by replacing "record" with "map". I also checked the /schemas added in https://github.com/openedx/openedx-events/pull/225/files and they are still to be in the repository. Could you clarify which schemas are missing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. I now see the schemas. In theory, if our tests are working as planned, these should be snapshots of all the schemas and if there is no change detected, all would be good.

You say you fear dropping record is a breaking change. Is it possible to create a breaking test?

Copy link
Member Author

@mariajgrimaldi mariajgrimaldi Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, changing "record" to "map" should not be a breaking change since dicts were not supported before this PR. By testing the code with dicts without any support, I can infer that L66 was only used to warn developers that dicts (mapped to avro records, which is a type we can't use because it clashes with the data attributes mapping) without annotations were not allowed, but either dicts with annotations since they were not supported. In any case, I added a test case that checks that the schema generated for dicts should map to map for future reference.

Although I said I wondered whether this was a breaking change given my findings and considering what you mentioned before, I think we should trust the repo's coverage.

# pylint: disable-next=broad-exception-raised
raise Exception("Unable to generate Avro schema for dict or array fields without annotation types.")
avro_type = PYTHON_TYPE_TO_AVRO_MAPPING[data_type]
field["type"] = avro_type
elif data_type_origin == list:
# returns types of list contents
# if data_type == List[int], arg_data_type = (int,)
# Returns types of list contents.
# Example: if data_type == List[int], arg_data_type = (int,)
arg_data_type = get_args(data_type)
if not arg_data_type:
raise TypeError(
Expand All @@ -83,6 +83,21 @@ def _create_avro_field_definition(data_key, data_type, previously_seen_types,
f" {set(SIMPLE_PYTHON_TYPE_TO_AVRO_MAPPING.keys())}"
)
field["type"] = {"type": PYTHON_TYPE_TO_AVRO_MAPPING[data_type_origin], "items": avro_type}
elif data_type_origin == dict:
# Returns types of dict contents.
# Example: if data_type == Dict[str, int], arg_data_type = (str, int)
arg_data_type = get_args(data_type)
if not arg_data_type:
raise TypeError(
mariajgrimaldi marked this conversation as resolved.
Show resolved Hide resolved
"Dict without annotation type is not supported. The argument should be a type, for eg., Dict[str, int]"
)
avro_type = SIMPLE_PYTHON_TYPE_TO_AVRO_MAPPING.get(arg_data_type[1])
if avro_type is None:
raise TypeError(
mariajgrimaldi marked this conversation as resolved.
Show resolved Hide resolved
"Only following types are supported for dict arguments:"
f" {set(SIMPLE_PYTHON_TYPE_TO_AVRO_MAPPING.keys())}"
)
field["type"] = {"type": PYTHON_TYPE_TO_AVRO_MAPPING[data_type_origin], "values": avro_type}
# Case 3: data_type is an attrs class
elif hasattr(data_type, "__attrs_attrs__"):
# Inner Attrs Class
Expand Down
24 changes: 23 additions & 1 deletion openedx_events/event_bus/avro/tests/test_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import io
import os
from datetime import datetime
from typing import List
from typing import List, Union
from unittest import TestCase
from uuid import UUID, uuid4

Expand Down Expand Up @@ -43,6 +43,7 @@ def generate_test_data_for_schema(schema): # pragma: no cover
'string': "default",
'double': 1.0,
'null': None,
'map': {'key': 'value'},
}

def get_default_value_or_raise(schema_field_type):
Expand Down Expand Up @@ -71,6 +72,9 @@ def get_default_value_or_raise(schema_field_type):
elif sub_field_type == "record":
# if we're dealing with a record, recurse into the record
data_dict.update({key: generate_test_data_for_schema(field_type)})
elif sub_field_type == "map":
# if we're dealing with a map, "values" will be the type of values in the map
data_dict.update({key: {"key": get_default_value_or_raise(field_type["values"])}})
else:
raise Exception(f"Unsupported type {field_type}") # pylint: disable=broad-exception-raised

Expand Down Expand Up @@ -112,6 +116,24 @@ def generate_test_event_data_for_data_type(data_type): # pragma: no cover
datetime: datetime.now(),
CCXLocator: CCXLocator(org='edx', course='DemoX', run='Demo_course', ccx='1'),
UUID: uuid4(),
dict[str, str]: {'key': 'value'},
dict[str, int]: {'key': 1},
dict[str, float]: {'key': 1.0},
dict[str, bool]: {'key': True},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about union types:

dict[str, Union[str, int]]: ...,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more test cases and this covers even more than I initially thought: a701f78. Thanks for the suggestion!

dict[str, CourseKey]: {'key': CourseKey.from_string("course-v1:edX+DemoX.1+2014")},
dict[str, UsageKey]: {'key': UsageKey.from_string(
"block-v1:edx+DemoX+Demo_course+type@video+block@UaEBjyMjcLW65gaTXggB93WmvoxGAJa0JeHRrDThk",
)},
dict[str, LibraryLocatorV2]: {'key': LibraryLocatorV2.from_string('lib:MITx:reallyhardproblems')},
dict[str, LibraryUsageLocatorV2]: {
'key': LibraryUsageLocatorV2.from_string('lb:MITx:reallyhardproblems:problem:problem1'),
},
dict[str, List[int]]: {'key': [1, 2, 3]},
dict[str, List[str]]: {'key': ["hi", "there"]},
dict[str, dict[str, str]]: {'key': {'key': 'value'}},
dict[str, dict[str, int]]: {'key': {'key': 1}},
dict[str, Union[str, int]]: {'key': 'value'},
dict[str, Union[str, int, float]]: {'key': 1.0},
}
data_dict = {}
for attribute in data_type.__attrs_attrs__:
Expand Down
136 changes: 111 additions & 25 deletions openedx_events/event_bus/avro/tests/test_deserializer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
"""Tests for avro.deserializer"""
import json
from datetime import datetime
from typing import List
from typing import Dict, List
from unittest import TestCase

import ddt
from opaque_keys.edx.keys import CourseKey, UsageKey
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2

from openedx_events.event_bus.avro.deserializer import AvroSignalDeserializer, deserialize_bytes_to_event_data
from openedx_events.event_bus.avro.tests.test_utilities import (
ComplexAttrs,
EventData,
NestedAttrsWithDefaults,
NestedNonAttrs,
Expand All @@ -23,43 +25,74 @@
from openedx_events.tests.utils import FreezeSignalCacheMixin


@ddt.ddt
class TestAvroSignalDeserializerCache(TestCase, FreezeSignalCacheMixin):
"""Test AvroSignalDeserializer"""

def setUp(self) -> None:
super().setUp()
self.maxDiff = None

def test_schema_string(self):
@ddt.data(
(
SimpleAttrs,
{
"name": "CloudEvent",
"type": "record",
"doc": "Avro Event Format for CloudEvents created with openedx_events/schema",
"namespace": "simple.signal",
"fields": [
{
"name": "data",
"type": {
"name": "SimpleAttrs",
"type": "record",
"fields": [
{"name": "boolean_field", "type": "boolean"},
{"name": "int_field", "type": "long"},
{"name": "float_field", "type": "double"},
{"name": "bytes_field", "type": "bytes"},
{"name": "string_field", "type": "string"},
],
},
},
],
}
),
(
ComplexAttrs,
{
"name": "CloudEvent",
"type": "record",
"doc": "Avro Event Format for CloudEvents created with openedx_events/schema",
"namespace": "simple.signal",
"fields": [
{
"name": "data",
"type": {
"name": "ComplexAttrs",
"type": "record",
"fields": [
{"name": "list_field", "type": {"type": "array", "items": "long"}},
{"name": "dict_field", "type": {"type": "map", "values": "long"}},
],
},
},
],
}
)
)
@ddt.unpack
def test_schema_string(self, data_cls, expected_schema):
"""
Test JSON round-trip; schema creation is tested more fully in test_schema.py.
"""
SIGNAL = create_simple_signal({
"data": SimpleAttrs
"data": data_cls
})

actual_schema = json.loads(AvroSignalDeserializer(SIGNAL).schema_string())
expected_schema = {
'name': 'CloudEvent',
'type': 'record',
'doc': 'Avro Event Format for CloudEvents created with openedx_events/schema',
'namespace': 'simple.signal',
'fields': [
{
'name': 'data',
'type': {
'name': 'SimpleAttrs',
'type': 'record',
'fields': [
{'name': 'boolean_field', 'type': 'boolean'},
{'name': 'int_field', 'type': 'long'},
{'name': 'float_field', 'type': 'double'},
{'name': 'bytes_field', 'type': 'bytes'},
{'name': 'string_field', 'type': 'string'},
]
}
}
]
}

assert actual_schema == expected_schema

def test_convert_dict_to_event_data(self):
Expand Down Expand Up @@ -233,6 +266,59 @@ def test_deserialization_of_list_without_annotation(self):
with self.assertRaises(TypeError):
deserializer.from_dict(initial_dict)

def test_deserialization_of_dict_with_annotation(self):
"""
Check that deserialization works as expected when dict data is annotated.
"""
DICT_SIGNAL = create_simple_signal({"dict_input": Dict[str, int]})
initial_dict = {"dict_input": {"key1": 1, "key2": 3}}

deserializer = AvroSignalDeserializer(DICT_SIGNAL)
event_data = deserializer.from_dict(initial_dict)
expected_event_data = {"key1": 1, "key2": 3}
test_data = event_data["dict_input"]

self.assertIsInstance(test_data, dict)
self.assertEqual(test_data, expected_event_data)

def test_deserialization_of_dict_without_annotation(self):
"""
Check that deserialization raises error when dict data is not annotated.

Create dummy signal to bypass schema check while initializing deserializer. Then,
update signal with incomplete type info to test whether correct exceptions are raised while deserializing data.
"""
SIGNAL = create_simple_signal({"dict_input": Dict[str, int]})
DICT_SIGNAL = create_simple_signal({"dict_input": Dict})
initial_dict = {"dict_input": {"key1": 1, "key2": 3}}

deserializer = AvroSignalDeserializer(SIGNAL)
deserializer.signal = DICT_SIGNAL

with self.assertRaises(TypeError):
deserializer.from_dict(initial_dict)

def test_deserialization_of_dict_with_complex_types_fails(self):
SIGNAL = create_simple_signal({"dict_input": Dict[str, list]})
with self.assertRaises(TypeError):
AvroSignalDeserializer(SIGNAL)
initial_dict = {"dict_input": {"key1": [1, 3], "key2": [4, 5]}}
# create dummy signal to bypass schema check while initializing deserializer
# This allows us to test whether correct exceptions are raised while deserializing data
DUMMY_SIGNAL = create_simple_signal({"dict_input": Dict[str, int]})
deserializer = AvroSignalDeserializer(DUMMY_SIGNAL)
# Update signal with incorrect type info
deserializer.signal = SIGNAL
with self.assertRaises(TypeError):
deserializer.from_dict(initial_dict)

def test_deserialization_of_dicts_with_keys_of_complex_types_fails(self):
SIGNAL = create_simple_signal({"dict_input": Dict[CourseKey, int]})
deserializer = AvroSignalDeserializer(SIGNAL)
initial_dict = {"dict_input": {CourseKey.from_string("course-v1:edX+DemoX.1+2014"): 1}}
with self.assertRaises(TypeError):
deserializer.from_dict(initial_dict)

def test_deserialization_of_nested_list_fails(self):
"""
Check that deserialization raises error when nested list data is passed.
Expand Down
28 changes: 26 additions & 2 deletions openedx_events/event_bus/avro/tests/test_schema.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Tests for event_bus.avro.schema module
"""
from typing import List
from typing import Dict, List
from unittest import TestCase

from openedx_events.event_bus.avro.schema import schema_from_signal
Expand Down Expand Up @@ -245,8 +245,9 @@ class UnextendedClass:

def test_throw_exception_to_list_or_dict_types_without_annotation(self):
LIST_SIGNAL = create_simple_signal({"list_input": list})
DICT_SIGNAL = create_simple_signal({"list_input": dict})
DICT_SIGNAL = create_simple_signal({"dict_input": dict})
LIST_WITHOUT_ANNOTATION_SIGNAL = create_simple_signal({"list_input": List})
DICT_WITHOUT_ANNOTATION_SIGNAL = create_simple_signal({"dict_input": Dict})
with self.assertRaises(Exception):
schema_from_signal(LIST_SIGNAL)

Expand All @@ -256,6 +257,14 @@ def test_throw_exception_to_list_or_dict_types_without_annotation(self):
with self.assertRaises(TypeError):
schema_from_signal(LIST_WITHOUT_ANNOTATION_SIGNAL)

with self.assertRaises(TypeError):
schema_from_signal(DICT_WITHOUT_ANNOTATION_SIGNAL)

def test_throw_exception_invalid_dict_annotation(self):
INVALID_DICT_SIGNAL = create_simple_signal({"dict_input": Dict[str, NestedAttrsWithDefaults]})
with self.assertRaises(TypeError):
schema_from_signal(INVALID_DICT_SIGNAL)

def test_list_with_annotation_works(self):
LIST_SIGNAL = create_simple_signal({"list_input": List[int]})
expected_dict = {
Expand All @@ -270,3 +279,18 @@ def test_list_with_annotation_works(self):
}
schema = schema_from_signal(LIST_SIGNAL)
self.assertDictEqual(schema, expected_dict)

def test_dict_with_annotation_works(self):
DICT_SIGNAL = create_simple_signal({"dict_input": Dict[str, int]})
expected_dict = {
'name': 'CloudEvent',
'type': 'record',
'doc': 'Avro Event Format for CloudEvents created with openedx_events/schema',
'namespace': 'simple.signal',
'fields': [{
'name': 'dict_input',
'type': {'type': 'map', 'values': 'long'},
}],
}
schema = schema_from_signal(DICT_SIGNAL)
self.assertDictEqual(schema, expected_dict)
7 changes: 7 additions & 0 deletions openedx_events/event_bus/avro/tests/test_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class SimpleAttrs:
string_field: str


@attr.s(auto_attribs=True)
class ComplexAttrs:
"""Class with all complex type fields"""
list_field: list[int]
dict_field: dict[str, int]


@attr.s(auto_attribs=True)
class SubTestData0:
"""Subclass for testing nested attrs"""
Expand Down
2 changes: 1 addition & 1 deletion openedx_events/event_bus/avro/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
PYTHON_TYPE_TO_AVRO_MAPPING = {
**SIMPLE_PYTHON_TYPE_TO_AVRO_MAPPING,
None: "null",
dict: "record",
dict: "map",
list: "array",
}
Loading