Skip to content

Commit

Permalink
fix: prevent exception caused by adding duplicate fields in schema ge…
Browse files Browse the repository at this point in the history
…neration (#195)
  • Loading branch information
Emrehzl94 authored Oct 18, 2024
1 parent 99d077b commit c2d0767
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,20 @@ class ChangeEventConverter() {
}

private fun schemaForKeys(keys: List<Map<String, Any>>?): Schema {
val addedFields = mutableSetOf<String>()

return SchemaBuilder.array(
// We need to define a uniform structure of key array elements. Because all elements
// must have identical structure, we list all available keys as optional fields.
SchemaBuilder.struct()
.apply {
keys?.forEach { key ->
key.forEach {
field(
it.key,
DynamicTypes.toConnectSchema(
it.value, optional = true, forceMapsAsStruct = true))
if (addedFields.add(it.key)) {
field(
it.key,
toConnectSchema(it.value, optional = true, forceMapsAsStruct = true))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,46 @@ abstract class Neo4jCdcSourceNodesIT {
}
.verifyWithin(Duration.ofSeconds(30))
}

@Neo4jSource(
startFrom = "EARLIEST",
strategy = CDC,
cdc =
CdcSource(
topics =
arrayOf(
CdcSourceTopic(
topic = "cdc", patterns = arrayOf(CdcSourceParam("(:TestSource)"))))))
@Test
fun `should publish with multiple keys on the same property`(
@TopicConsumer(topic = "cdc", offset = "earliest") consumer: ConvertingKafkaConsumer,
session: Session
) {
session.run("CREATE CONSTRAINT FOR (n:TestSource) REQUIRE (n.prop1, n.prop2) IS KEY").consume()
session.run("CREATE CONSTRAINT FOR (n:TestSource) REQUIRE n.prop1 IS KEY").consume()
session
.run(
"CREATE (n:TestSource) SET n = ${'$'}props",
mapOf("props" to mapOf("prop1" to "value1", "prop2" to "value2")))
.consume()

TopicVerifier.create<ChangeEvent, ChangeEvent>(consumer)
.assertMessageValue { value ->
assertThat(value)
.hasEventType(NODE)
.hasOperation(CREATE)
.hasNodeKeys(
mapOf(
"TestSource" to
listOf(
mapOf("prop1" to "value1", "prop2" to "value2"),
mapOf("prop1" to "value1"))))
.labelledAs("TestSource")
.hasNoBeforeState()
.hasAfterStateProperties(mapOf("prop1" to "value1", "prop2" to "value2"))
}
.verifyWithin(Duration.ofSeconds(30))
}
}

@KeyValueConverter(key = AVRO, value = AVRO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,49 @@ abstract class Neo4jCdcSourceRelationshipsIT {
}
.verifyWithin(Duration.ofSeconds(30))
}

@Neo4jSource(
startFrom = "EARLIEST",
strategy = CDC,
cdc =
CdcSource(
topics =
arrayOf(
CdcSourceTopic(
topic = "neo4j-cdc-keys-rel",
patterns =
arrayOf(CdcSourceParam("(:Person)-[:EMPLOYED]->(:Company)"))))))
@Test
fun `should publish with multiple keys on the same property`(
@TopicConsumer(topic = "neo4j-cdc-keys-rel", offset = "earliest")
consumer: ConvertingKafkaConsumer,
session: Session
) {
session
.run(
"CREATE CONSTRAINT employedId FOR ()-[r:EMPLOYED]->() REQUIRE (r.id, r.role) IS RELATIONSHIP KEY")
.consume()
session
.run(
"CREATE CONSTRAINT employedRole FOR ()-[r:EMPLOYED]->() REQUIRE r.id IS RELATIONSHIP KEY")
.consume()

session.run("CREATE (:Person)-[:EMPLOYED {id: 1, role: 'SWE'}]->(:Company)").consume()

TopicVerifier.create<ChangeEvent, ChangeEvent>(consumer)
.assertMessageValue { value ->
assertThat(value)
.hasEventType(RELATIONSHIP)
.hasOperation(CREATE)
.hasType("EMPLOYED")
.startLabelledAs("Person")
.endLabelledAs("Company")
.hasNoBeforeState()
.hasAfterStateProperties(mapOf("id" to 1L, "role" to "SWE"))
.hasRelationshipKeys(listOf(mapOf("id" to 1L, "role" to "SWE"), mapOf("id" to 1L)))
}
.verifyWithin(Duration.ofSeconds(30))
}
}

@KeyValueConverter(key = AVRO, value = AVRO)
Expand Down

0 comments on commit c2d0767

Please sign in to comment.