From c488bb1abcb19774cb7a5918712aa33f4e17a531 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emre=20H=C4=B1zal?= Date: Fri, 19 Jul 2024 11:37:17 +0200 Subject: [PATCH] feat: log warning messages when pattern keys and constraints do not match (#162) --- .../kafka/data/PropertyConstraints.kt | 72 +++++ .../connectors/kafka/sink/SinkStrategy.kt | 5 + .../kafka/sink/strategy/NodePatternHandler.kt | 16 +- .../kafka/sink/strategy/PatternHandler.kt | 3 + .../strategy/RelationshipPatternHandler.kt | 31 +- .../pattern/PatternConstraintValidator.kt | 304 ++++++++++++++++++ .../sink/strategy/NodePatternHandlerTest.kt | 214 ++++++++++++ .../RelationshipPatternHandlerTest.kt | 231 +++++++++++++ 8 files changed, 874 insertions(+), 2 deletions(-) create mode 100644 common/src/main/kotlin/org/neo4j/connectors/kafka/data/PropertyConstraints.kt create mode 100644 sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/pattern/PatternConstraintValidator.kt diff --git a/common/src/main/kotlin/org/neo4j/connectors/kafka/data/PropertyConstraints.kt b/common/src/main/kotlin/org/neo4j/connectors/kafka/data/PropertyConstraints.kt new file mode 100644 index 00000000..90295365 --- /dev/null +++ b/common/src/main/kotlin/org/neo4j/connectors/kafka/data/PropertyConstraints.kt @@ -0,0 +1,72 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.connectors.kafka.data + +import org.neo4j.driver.Driver +import org.neo4j.driver.SessionConfig + +enum class ConstraintEntityType(val value: String) { + NODE("NODE"), + RELATIONSHIP("RELATIONSHIP"), +} + +enum class ConstraintType(val value: String) { + NODE_KEY("NODE_KEY"), + NODE_UNIQUENESS("UNIQUENESS"), + NODE_EXISTENCE("NODE_PROPERTY_EXISTENCE"), + RELATIONSHIP_KEY("RELATIONSHIP_KEY"), + RELATIONSHIP_UNIQUENESS("RELATIONSHIP_UNIQUENESS"), + RELATIONSHIP_EXISTENCE("RELATIONSHIP_PROPERTY_EXISTENCE") +} + +val NODE_CONSTRAINTS = + listOf( + ConstraintType.NODE_KEY.value, + ConstraintType.NODE_UNIQUENESS.value, + ConstraintType.NODE_EXISTENCE.value) + +val RELATIONSHIP_CONSTRAINTS = + listOf( + ConstraintType.RELATIONSHIP_KEY.value, + ConstraintType.RELATIONSHIP_UNIQUENESS.value, + ConstraintType.RELATIONSHIP_EXISTENCE.value) + +data class ConstraintData( + val entityType: String, + val constraintType: String, + val labelOrType: String, + val properties: List +) + +fun fetchConstraintData(driver: Driver, sessionConfig: SessionConfig): List { + return try { + driver.session(sessionConfig).use { session -> + session + .run("SHOW CONSTRAINTS YIELD entityType, type, labelsOrTypes, properties RETURN *") + .list() + .map { + ConstraintData( + entityType = it.get("entityType").asString(), + constraintType = it.get("type").asString(), + labelOrType = it.get("labelsOrTypes").asList()[0].toString(), + properties = it.get("properties").asList().map { property -> property.toString() }) + } + } + } catch (e: Exception) { + emptyList() + } +} diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt index 0dab4446..3017d53c 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/SinkStrategy.kt @@ -24,6 +24,7 @@ import org.apache.kafka.connect.sink.SinkRecord import org.neo4j.connectors.kafka.data.DynamicTypes import org.neo4j.connectors.kafka.data.cdcTxId import org.neo4j.connectors.kafka.data.cdcTxSeq +import org.neo4j.connectors.kafka.data.fetchConstraintData import org.neo4j.connectors.kafka.data.isCdcMessage import org.neo4j.connectors.kafka.sink.strategy.CdcSchemaHandler import org.neo4j.connectors.kafka.sink.strategy.CdcSourceIdHandler @@ -164,6 +165,8 @@ interface SinkStrategyHandler { bindHeaderAs = config.patternBindHeaderAs, bindKeyAs = config.patternBindKeyAs, bindValueAs = config.patternBindValueAs) + + handler.validate(fetchConstraintData(config.driver, config.sessionConfig())) } val relationshipPattern = @@ -187,6 +190,8 @@ interface SinkStrategyHandler { bindHeaderAs = config.patternBindHeaderAs, bindKeyAs = config.patternBindKeyAs, bindValueAs = config.patternBindValueAs) + + handler.validate(fetchConstraintData(config.driver, config.sessionConfig())) } val cdcSourceIdTopics = config.getList(SinkConfiguration.CDC_SOURCE_ID_TOPICS) diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandler.kt index 099a3815..fb6ccce4 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandler.kt @@ -16,12 +16,14 @@ */ package org.neo4j.connectors.kafka.sink.strategy +import org.neo4j.connectors.kafka.data.ConstraintData import org.neo4j.connectors.kafka.sink.ChangeQuery import org.neo4j.connectors.kafka.sink.SinkConfiguration import org.neo4j.connectors.kafka.sink.SinkMessage import org.neo4j.connectors.kafka.sink.SinkStrategy import org.neo4j.connectors.kafka.sink.strategy.pattern.NodePattern import org.neo4j.connectors.kafka.sink.strategy.pattern.Pattern +import org.neo4j.connectors.kafka.sink.strategy.pattern.PatternConstraintValidator import org.neo4j.cypherdsl.core.Cypher import org.neo4j.cypherdsl.core.Literal import org.neo4j.cypherdsl.core.Node @@ -33,7 +35,7 @@ import org.slf4j.LoggerFactory class NodePatternHandler( val topic: String, - patternString: String, + private val patternString: String, private val mergeProperties: Boolean, private val renderer: Renderer, private val batchSize: Int, @@ -104,6 +106,18 @@ class NodePatternHandler( .toList() } + fun validate(constraints: List) { + val warningMessages = checkConstraints(constraints) + warningMessages.forEach { logger.warn(it) } + } + + override fun checkConstraints(constraints: List): List { + val nodeWarning = + PatternConstraintValidator.checkNodeWarning(constraints, pattern, patternString) + ?: return emptyList() + return listOf(nodeWarning) + } + private fun buildStatement(): String { val createOperation = Cypher.literalOf(CREATE) val deleteOperation = Cypher.literalOf(DELETE) diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/PatternHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/PatternHandler.kt index 05d66137..b245f3e7 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/PatternHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/PatternHandler.kt @@ -19,6 +19,7 @@ package org.neo4j.connectors.kafka.sink.strategy import java.time.Instant import java.time.ZoneOffset import org.apache.kafka.connect.errors.ConnectException +import org.neo4j.connectors.kafka.data.ConstraintData import org.neo4j.connectors.kafka.exceptions.InvalidDataException import org.neo4j.connectors.kafka.sink.SinkConfiguration import org.neo4j.connectors.kafka.sink.SinkMessage @@ -69,6 +70,8 @@ abstract class PatternHandler( .flatten() } + abstract fun checkConstraints(constraints: List): List + /** * Checks if given from key is explicitly defined, i.e. something starting with * __key, __value or __header. diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandler.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandler.kt index 1c93a675..88c7bfc3 100644 --- a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandler.kt +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandler.kt @@ -16,11 +16,13 @@ */ package org.neo4j.connectors.kafka.sink.strategy +import org.neo4j.connectors.kafka.data.ConstraintData import org.neo4j.connectors.kafka.sink.ChangeQuery import org.neo4j.connectors.kafka.sink.SinkConfiguration import org.neo4j.connectors.kafka.sink.SinkMessage import org.neo4j.connectors.kafka.sink.SinkStrategy import org.neo4j.connectors.kafka.sink.strategy.pattern.Pattern +import org.neo4j.connectors.kafka.sink.strategy.pattern.PatternConstraintValidator import org.neo4j.connectors.kafka.sink.strategy.pattern.RelationshipPattern import org.neo4j.cypherdsl.core.Cypher import org.neo4j.cypherdsl.core.Literal @@ -33,7 +35,7 @@ import org.slf4j.LoggerFactory class RelationshipPatternHandler( val topic: String, - patternString: String, + private val patternString: String, private val mergeNodeProperties: Boolean, private val mergeRelationshipProperties: Boolean, private val renderer: Renderer, @@ -121,6 +123,33 @@ class RelationshipPatternHandler( .toList() } + fun validate(constraints: List) { + val warningMessages = checkConstraints(constraints) + warningMessages.forEach { logger.warn(it) } + } + + override fun checkConstraints(constraints: List): List { + val warningMessages = mutableListOf() + + val startNodeWarning = + PatternConstraintValidator.checkNodeWarning(constraints, pattern.start, patternString) + val relationshipWarning = + PatternConstraintValidator.checkRelationshipWarning(constraints, pattern, patternString) + val endNodeWarning = + PatternConstraintValidator.checkNodeWarning(constraints, pattern.end, patternString) + + if (startNodeWarning != null) { + warningMessages.add(startNodeWarning) + } + if (relationshipWarning != null) { + warningMessages.add(relationshipWarning) + } + if (endNodeWarning != null) { + warningMessages.add(endNodeWarning) + } + return warningMessages + } + private fun buildStatement(): String { val createOperation = Cypher.literalOf(CREATE) val deleteOperation = Cypher.literalOf(DELETE) diff --git a/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/pattern/PatternConstraintValidator.kt b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/pattern/PatternConstraintValidator.kt new file mode 100644 index 00000000..2d7eeb53 --- /dev/null +++ b/sink/src/main/kotlin/org/neo4j/connectors/kafka/sink/strategy/pattern/PatternConstraintValidator.kt @@ -0,0 +1,304 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.connectors.kafka.sink.strategy.pattern + +import org.neo4j.connectors.kafka.data.ConstraintData +import org.neo4j.connectors.kafka.data.ConstraintEntityType +import org.neo4j.connectors.kafka.data.ConstraintType +import org.neo4j.connectors.kafka.data.NODE_CONSTRAINTS +import org.neo4j.connectors.kafka.data.RELATIONSHIP_CONSTRAINTS + +object PatternConstraintValidator { + fun checkNodeWarning( + constraints: List, + pattern: NodePattern, + patternString: String + ): String? { + val typeConstraintMap = + constraints + .filter { + it.entityType == ConstraintEntityType.NODE.value && + NODE_CONSTRAINTS.contains(it.constraintType) && + pattern.labels.contains(it.labelOrType) + } + .groupBy { "${it.constraintType}-${it.labelOrType}" } + + val keys = pattern.keyProperties.map { it.to } + + for (label in pattern.labels) { + if (checkNodeKey(label, keys, typeConstraintMap)) { + return null + } + + if (checkNodeUniqueness(label, keys, typeConstraintMap) && + checkNodePropertyExistence(label, keys, typeConstraintMap)) { + return null + } + } + + return buildNodeWarningMessage(pattern, patternString, typeConstraintMap, keys) + } + + fun checkRelationshipWarning( + constraints: List, + pattern: RelationshipPattern, + patternString: String + ): String? { + val typeConstraintMap = + constraints + .filter { + it.entityType == ConstraintEntityType.RELATIONSHIP.value && + RELATIONSHIP_CONSTRAINTS.contains(it.constraintType) && + pattern.type == (it.labelOrType) + } + .groupBy { "${it.constraintType}-${it.labelOrType}" } + + val keys = pattern.keyProperties.map { it.to } + val type = pattern.type!! + + if (checkRelationshipKey(type, keys, typeConstraintMap)) { + return null + } + + if (checkRelationshipUniqueness(type, keys, typeConstraintMap) && + checkRelationshipPropertyExistence(type, keys, typeConstraintMap)) { + return null + } + + return buildRelationshipWarningMessage(pattern, patternString, typeConstraintMap, keys) + } + + private fun checkNodeKey( + label: String, + keys: List, + typeConstraintMap: Map> + ): Boolean { + val nodeKeyConstraints = + typeConstraintMap["${ConstraintType.NODE_KEY.value}-${label}"] ?: return false + for (constraint in nodeKeyConstraints) { + if (equalsIgnoreOrder(keys, constraint.properties)) { + return true + } + } + return false + } + + private fun checkNodeUniqueness( + label: String, + keys: List, + typeConstraintMap: Map> + ): Boolean { + val uniquenessConstraints = + typeConstraintMap["${ConstraintType.NODE_UNIQUENESS.value}-${label}"] ?: return false + for (constraint in uniquenessConstraints) { + if (equalsIgnoreOrder(keys, constraint.properties)) { + return true + } + } + return false + } + + private fun checkNodePropertyExistence( + label: String, + keys: List, + typeConstraintMap: Map> + ): Boolean { + val existenceConstraints = + typeConstraintMap["${ConstraintType.NODE_EXISTENCE.value}-${label}"] ?: return false + val properties = existenceConstraints.flatMap { it.properties } + return properties.containsAll(keys) + } + + private fun checkRelationshipKey( + type: String, + keys: List, + typeConstraintMap: Map> + ): Boolean { + val relationshipKeyConstraints = + typeConstraintMap["${ConstraintType.RELATIONSHIP_KEY.value}-${type}"] ?: return false + for (constraint in relationshipKeyConstraints) { + if (equalsIgnoreOrder(keys, constraint.properties)) { + return true + } + } + return false + } + + private fun checkRelationshipUniqueness( + type: String, + keys: List, + typeConstraintMap: Map> + ): Boolean { + val uniquenessConstraints = + typeConstraintMap["${ConstraintType.RELATIONSHIP_UNIQUENESS.value}-${type}"] ?: return false + for (constraint in uniquenessConstraints) { + if (equalsIgnoreOrder(keys, constraint.properties)) { + return true + } + } + return false + } + + private fun checkRelationshipPropertyExistence( + type: String, + keys: List, + typeConstraintMap: Map> + ): Boolean { + val existenceConstraints = + typeConstraintMap["${ConstraintType.RELATIONSHIP_EXISTENCE.value}-${type}"] ?: return false + val properties = existenceConstraints.flatMap { it.properties } + return properties.containsAll(keys) + } + + private fun buildNodeWarningMessage( + pattern: NodePattern, + patternString: String, + typeConstraintMap: Map>, + keys: List + ): String { + val stringBuilder = StringBuilder() + if (pattern.labels.size > 1) { + stringBuilder.append( + "None of the labels ${pattern.labels.joinToString(", ") { "'$it'" }} match the key(s) defined by the pattern $patternString.", + ) + stringBuilder.append("\nPlease fix at least one of the following label constraints:") + } else { + stringBuilder.append( + "Label '${pattern.labels.first()}' does not match the key(s) defined by the pattern $patternString.", + ) + stringBuilder.append("\nPlease fix the label constraint:") + } + + addLabelConstraintsText(pattern, typeConstraintMap, stringBuilder) + stringBuilder.append("\nExpected constraints:") + stringBuilder.append("\n\t- ${getConstraintWarningText(ConstraintType.NODE_KEY.value, keys)}") + stringBuilder.append("\nor:") + stringBuilder.append( + "\n\t- ${getConstraintWarningText(ConstraintType.NODE_UNIQUENESS.value, keys)}") + for (key in keys) { + stringBuilder.append( + "\n\t- ${getConstraintWarningText(ConstraintType.NODE_EXISTENCE.value, listOf(key))}") + } + return stringBuilder.toString() + } + + private fun buildRelationshipWarningMessage( + pattern: RelationshipPattern, + patternString: String, + typeConstraintMap: Map>, + keys: List + ): String { + val stringBuilder = StringBuilder() + + val type = pattern.type!! + + stringBuilder.append( + "Relationship '$type' does not match the key(s) defined by the pattern $patternString.", + ) + stringBuilder.append("\nPlease fix the relationship constraints:") + + val relationshipConstraints = getRelationshipConstraints(typeConstraintMap, type) + + addExistingConstraints(relationshipConstraints, type, stringBuilder) + + stringBuilder.append("\nExpected constraints:") + stringBuilder.append( + "\n\t- ${getConstraintWarningText(ConstraintType.RELATIONSHIP_KEY.value, keys)}") + stringBuilder.append("\nor:") + stringBuilder.append( + "\n\t- ${getConstraintWarningText(ConstraintType.RELATIONSHIP_UNIQUENESS.value, keys)}") + for (key in keys) { + stringBuilder.append( + "\n\t- ${getConstraintWarningText(ConstraintType.RELATIONSHIP_EXISTENCE.value, listOf(key))}") + } + return stringBuilder.toString() + } + + private fun addLabelConstraintsText( + pattern: NodePattern, + typeConstraintMap: Map>, + stringBuilder: StringBuilder + ) { + for (label in pattern.labels) { + val labelConstraints = getLabelConstraints(typeConstraintMap, label) + addExistingConstraints(labelConstraints, label, stringBuilder) + } + } + + private fun addExistingConstraints( + constraints: MutableList, + labelOrType: String, + stringBuilder: StringBuilder + ) { + if (constraints.isNotEmpty()) { + stringBuilder.append("\n\t'$labelOrType' has:") + for (constraint in constraints) { + stringBuilder.append( + "\n\t\t- ${ + getConstraintWarningText( + constraint.constraintType, + constraint.properties, + ) + }", + ) + } + } else { + stringBuilder.append("\n\t'$labelOrType' has no key constraints") + } + } + + private fun getConstraintWarningText(constraintType: String, properties: List) = + "$constraintType (${properties.joinToString(", ")})" + + private fun getLabelConstraints( + typeConstraintMap: Map>, + label: String + ): MutableList { + val labelConstraints = mutableListOf() + labelConstraints.addAll( + typeConstraintMap["${ConstraintType.NODE_KEY.value}-${label}"] ?: emptyList(), + ) + labelConstraints.addAll( + typeConstraintMap["${ConstraintType.NODE_UNIQUENESS.value}-${label}"] ?: emptyList(), + ) + labelConstraints.addAll( + typeConstraintMap["${ConstraintType.NODE_EXISTENCE.value}-${label}"] ?: emptyList(), + ) + return labelConstraints + } + + private fun getRelationshipConstraints( + typeConstraintMap: Map>, + type: String + ): MutableList { + val relationshipConstraints = mutableListOf() + relationshipConstraints.addAll( + typeConstraintMap["${ConstraintType.RELATIONSHIP_KEY.value}-${type}"] ?: emptyList(), + ) + relationshipConstraints.addAll( + typeConstraintMap["${ConstraintType.RELATIONSHIP_UNIQUENESS.value}-${type}"] ?: emptyList(), + ) + relationshipConstraints.addAll( + typeConstraintMap["${ConstraintType.RELATIONSHIP_EXISTENCE.value}-${type}"] ?: emptyList(), + ) + return relationshipConstraints + } + + private fun equalsIgnoreOrder(first: List, second: List): Boolean { + return first.size == second.size && first.toSet() == second.toSet() + } +} diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandlerTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandlerTest.kt index 5cb776e3..77698ed7 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandlerTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/NodePatternHandlerTest.kt @@ -26,6 +26,9 @@ import org.apache.kafka.connect.data.SchemaBuilder import org.apache.kafka.connect.data.Struct import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import org.neo4j.connectors.kafka.data.ConstraintData +import org.neo4j.connectors.kafka.data.ConstraintEntityType +import org.neo4j.connectors.kafka.data.ConstraintType import org.neo4j.connectors.kafka.data.DynamicTypes import org.neo4j.connectors.kafka.data.SimpleTypes import org.neo4j.connectors.kafka.exceptions.InvalidDataException @@ -672,6 +675,217 @@ class NodePatternHandlerTest : HandlerTest() { message = "Key 'second_id' could not be located in the message.") } + @Test + fun `checkConstraints should not return warning messages if node key constraint provided with all keys`() { + val handler = + NodePatternHandler( + "my-topic", + "(:ALabel{!id, !second_id, name})", + mergeProperties = true, + renderer = Renderer.getDefaultRenderer(), + batchSize = 1) + + val constraints = + listOf( + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_KEY.value, + labelOrType = "ALabel", + properties = listOf("id", "second_id"))) + + val warningMessages = handler.checkConstraints(constraints) + + warningMessages shouldBe emptyList() + } + + @Test + fun `checkConstraints should not return warning messages if node uniqueness and existence constraints provided with all keys`() { + val handler = + NodePatternHandler( + "my-topic", + "(:ALabel{!id, !second_id, name})", + mergeProperties = true, + renderer = Renderer.getDefaultRenderer(), + batchSize = 1) + + val constraints = + listOf( + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_UNIQUENESS.value, + labelOrType = "ALabel", + properties = listOf("id", "second_id")), + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_EXISTENCE.value, + labelOrType = "ALabel", + properties = listOf("id")), + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_EXISTENCE.value, + labelOrType = "ALabel", + properties = listOf("second_id"))) + + val warningMessages = handler.checkConstraints(constraints) + + warningMessages shouldBe emptyList() + } + + @Test + fun `checkConstraints should return warning messages with single label pattern`() { + val handler = + NodePatternHandler( + "my-topic", + "(:ALabel{!id, !second_id, name})", + mergeProperties = true, + renderer = Renderer.getDefaultRenderer(), + batchSize = 1) + + val constraints = emptyList() + + val warningMessages = handler.checkConstraints(constraints) + + warningMessages shouldBe + listOf( + "Label 'ALabel' does not match the key(s) defined by the pattern (:ALabel{!id, !second_id, name})." + + "\nPlease fix the label constraint:" + + "\n\t'ALabel' has no key constraints" + + "\nExpected constraints:" + + "\n\t- NODE_KEY (id, second_id)" + + "\nor:" + + "\n\t- UNIQUENESS (id, second_id)" + + "\n\t- NODE_PROPERTY_EXISTENCE (id)" + + "\n\t- NODE_PROPERTY_EXISTENCE (second_id)") + } + + @Test + fun `checkConstraints should return warning messages with empty list of constraints`() { + val handler = + NodePatternHandler( + "my-topic", + "(:ALabel:BLabel{!id, !second_id, name})", + mergeProperties = true, + renderer = Renderer.getDefaultRenderer(), + batchSize = 1) + + val constraints = emptyList() + + val warningMessages = handler.checkConstraints(constraints) + + warningMessages shouldBe + listOf( + "None of the labels 'ALabel', 'BLabel' match the key(s) defined by the pattern (:ALabel:BLabel{!id, !second_id, name})." + + "\nPlease fix at least one of the following label constraints:" + + "\n\t'ALabel' has no key constraints" + + "\n\t'BLabel' has no key constraints" + + "\nExpected constraints:" + + "\n\t- NODE_KEY (id, second_id)" + + "\nor:" + + "\n\t- UNIQUENESS (id, second_id)" + + "\n\t- NODE_PROPERTY_EXISTENCE (id)" + + "\n\t- NODE_PROPERTY_EXISTENCE (second_id)") + } + + @Test + fun `checkConstraints should return warning messages with existing node key constraint`() { + val handler = + NodePatternHandler( + "my-topic", + "(:ALabel:BLabel{!id, !second_id, name})", + mergeProperties = true, + renderer = Renderer.getDefaultRenderer(), + batchSize = 1) + + val constraints = + listOf( + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_KEY.value, + labelOrType = "ALabel", + properties = listOf("id")), + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_KEY.value, + labelOrType = "BLabel", + properties = listOf("id"))) + + val warningMessages = handler.checkConstraints(constraints) + + warningMessages shouldBe + listOf( + "None of the labels 'ALabel', 'BLabel' match the key(s) defined by the pattern (:ALabel:BLabel{!id, !second_id, name})." + + "\nPlease fix at least one of the following label constraints:" + + "\n\t'ALabel' has:" + + "\n\t\t- NODE_KEY (id)" + + "\n\t'BLabel' has:" + + "\n\t\t- NODE_KEY (id)" + + "\nExpected constraints:" + + "\n\t- NODE_KEY (id, second_id)" + + "\nor:" + + "\n\t- UNIQUENESS (id, second_id)" + + "\n\t- NODE_PROPERTY_EXISTENCE (id)" + + "\n\t- NODE_PROPERTY_EXISTENCE (second_id)") + } + + @Test + fun `checkConstraints should return warning messages with existing uniqueness and existence constraints`() { + val handler = + NodePatternHandler( + "my-topic", + "(:ALabel:BLabel{!id, !second_id, name})", + mergeProperties = true, + renderer = Renderer.getDefaultRenderer(), + batchSize = 1) + + val constraints = + listOf( + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_UNIQUENESS.value, + labelOrType = "ALabel", + properties = listOf("id")), + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_EXISTENCE.value, + labelOrType = "ALabel", + properties = listOf("id")), + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_EXISTENCE.value, + labelOrType = "ALabel", + properties = listOf("second_id")), + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_UNIQUENESS.value, + labelOrType = "BLabel", + properties = listOf("id", "second_id")), + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_EXISTENCE.value, + labelOrType = "BLabel", + properties = listOf("id"))) + + val warningMessages = handler.checkConstraints(constraints) + + warningMessages shouldBe + listOf( + "None of the labels 'ALabel', 'BLabel' match the key(s) defined by the pattern (:ALabel:BLabel{!id, !second_id, name})." + + "\nPlease fix at least one of the following label constraints:" + + "\n\t'ALabel' has:" + + "\n\t\t- UNIQUENESS (id)" + + "\n\t\t- NODE_PROPERTY_EXISTENCE (id)" + + "\n\t\t- NODE_PROPERTY_EXISTENCE (second_id)" + + "\n\t'BLabel' has:" + + "\n\t\t- UNIQUENESS (id, second_id)" + + "\n\t\t- NODE_PROPERTY_EXISTENCE (id)" + + "\nExpected constraints:" + + "\n\t- NODE_KEY (id, second_id)" + + "\nor:" + + "\n\t- UNIQUENESS (id, second_id)" + + "\n\t- NODE_PROPERTY_EXISTENCE (id)" + + "\n\t- NODE_PROPERTY_EXISTENCE (second_id)") + } + private fun assertQueryAndParameters( pattern: String, keySchema: Schema = Schema.STRING_SCHEMA, diff --git a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandlerTest.kt b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandlerTest.kt index 20df2c16..4835cebe 100644 --- a/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandlerTest.kt +++ b/sink/src/test/kotlin/org/neo4j/connectors/kafka/sink/strategy/RelationshipPatternHandlerTest.kt @@ -25,6 +25,9 @@ import org.apache.kafka.connect.data.SchemaBuilder import org.apache.kafka.connect.data.Struct import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import org.neo4j.connectors.kafka.data.ConstraintData +import org.neo4j.connectors.kafka.data.ConstraintEntityType +import org.neo4j.connectors.kafka.data.ConstraintType import org.neo4j.connectors.kafka.exceptions.InvalidDataException import org.neo4j.connectors.kafka.sink.ChangeQuery import org.neo4j.cypherdsl.core.renderer.Renderer @@ -900,6 +903,234 @@ class RelationshipPatternHandlerTest : HandlerTest() { message = "Key 'end' could not be located in the values.") } + @Test + fun `checkConstraints should not return warning messages if relationship key constraint provided with all keys`() { + val handler = + RelationshipPatternHandler( + "my-topic", + "(:LabelA{!idStart})-[:REL_TYPE{!id, !second_id}]->(:LabelB{!idEnd})", + mergeNodeProperties = true, + mergeRelationshipProperties = true, + renderer = Renderer.getDefaultRenderer(), + batchSize = 1) + + val constraints = + listOf( + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_KEY.value, + labelOrType = "LabelA", + properties = listOf("idStart")), + ConstraintData( + entityType = ConstraintEntityType.RELATIONSHIP.value, + constraintType = ConstraintType.RELATIONSHIP_KEY.value, + labelOrType = "REL_TYPE", + properties = listOf("id", "second_id")), + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_KEY.value, + labelOrType = "LabelB", + properties = listOf("idEnd"))) + + val warningMessages = handler.checkConstraints(constraints) + + warningMessages shouldBe emptyList() + } + + @Test + fun `checkConstraints should not return warning messages if relationship uniqueness and existence constraint provided with all keys`() { + val handler = + RelationshipPatternHandler( + "my-topic", + "(:LabelA{!idStart})-[:REL_TYPE{!id, !second_id}]->(:LabelB{!idEnd})", + mergeNodeProperties = true, + mergeRelationshipProperties = true, + renderer = Renderer.getDefaultRenderer(), + batchSize = 1) + + val constraints = + listOf( + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_KEY.value, + labelOrType = "LabelA", + properties = listOf("idStart")), + ConstraintData( + entityType = ConstraintEntityType.RELATIONSHIP.value, + constraintType = ConstraintType.RELATIONSHIP_UNIQUENESS.value, + labelOrType = "REL_TYPE", + properties = listOf("id", "second_id")), + ConstraintData( + entityType = ConstraintEntityType.RELATIONSHIP.value, + constraintType = ConstraintType.RELATIONSHIP_EXISTENCE.value, + labelOrType = "REL_TYPE", + properties = listOf("id")), + ConstraintData( + entityType = ConstraintEntityType.RELATIONSHIP.value, + constraintType = ConstraintType.RELATIONSHIP_EXISTENCE.value, + labelOrType = "REL_TYPE", + properties = listOf("second_id")), + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_KEY.value, + labelOrType = "LabelB", + properties = listOf("idEnd"))) + + val warningMessages = handler.checkConstraints(constraints) + + warningMessages shouldBe emptyList() + } + + @Test + fun `checkConstraints should return warning messages with empty list of constraints`() { + val handler = + RelationshipPatternHandler( + "my-topic", + "(:LabelA{!idStart})-[:REL_TYPE{!id}]->(:LabelB{!idEnd})", + mergeNodeProperties = true, + mergeRelationshipProperties = true, + renderer = Renderer.getDefaultRenderer(), + batchSize = 1) + + val constraints = emptyList() + + val warningMessages = handler.checkConstraints(constraints) + + val startNode = + "Label 'LabelA' does not match the key(s) defined by the pattern (:LabelA{!idStart})-[:REL_TYPE{!id}]->(:LabelB{!idEnd})." + + "\nPlease fix the label constraint:" + + "\n\t'LabelA' has no key constraints" + + "\nExpected constraints:" + + "\n\t- NODE_KEY (idStart)" + + "\nor:" + + "\n\t- UNIQUENESS (idStart)" + + "\n\t- NODE_PROPERTY_EXISTENCE (idStart)" + + warningMessages[0] shouldBe startNode + + val relationship = + "Relationship 'REL_TYPE' does not match the key(s) defined by the pattern (:LabelA{!idStart})-[:REL_TYPE{!id}]->(:LabelB{!idEnd})." + + "\nPlease fix the relationship constraints:" + + "\n\t'REL_TYPE' has no key constraints" + + "\nExpected constraints:" + + "\n\t- RELATIONSHIP_KEY (id)" + + "\nor:" + + "\n\t- RELATIONSHIP_UNIQUENESS (id)" + + "\n\t- RELATIONSHIP_PROPERTY_EXISTENCE (id)" + + warningMessages[1] shouldBe relationship + + val endNode = + "Label 'LabelB' does not match the key(s) defined by the pattern (:LabelA{!idStart})-[:REL_TYPE{!id}]->(:LabelB{!idEnd})." + + "\nPlease fix the label constraint:" + + "\n\t'LabelB' has no key constraints" + + "\nExpected constraints:" + + "\n\t- NODE_KEY (idEnd)" + + "\nor:" + + "\n\t- UNIQUENESS (idEnd)" + + "\n\t- NODE_PROPERTY_EXISTENCE (idEnd)" + + warningMessages[2] shouldBe endNode + } + + @Test + fun `checkConstraints should return warning messages with existing key constraints`() { + val handler = + RelationshipPatternHandler( + "my-topic", + "(:LabelA{!idStart})-[:REL_TYPE{!id, !second_id}]->(:LabelB{!idEnd})", + mergeNodeProperties = true, + mergeRelationshipProperties = true, + renderer = Renderer.getDefaultRenderer(), + batchSize = 1) + + val constraints = + listOf( + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_KEY.value, + labelOrType = "LabelA", + properties = listOf("idStart")), + ConstraintData( + entityType = ConstraintEntityType.RELATIONSHIP.value, + constraintType = ConstraintType.RELATIONSHIP_KEY.value, + labelOrType = "REL_TYPE", + properties = listOf("id")), + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_KEY.value, + labelOrType = "LabelB", + properties = listOf("idEnd"))) + + val warningMessages = handler.checkConstraints(constraints) + + val relationship = + "Relationship 'REL_TYPE' does not match the key(s) defined by the pattern (:LabelA{!idStart})-[:REL_TYPE{!id, !second_id}]->(:LabelB{!idEnd})." + + "\nPlease fix the relationship constraints:" + + "\n\t'REL_TYPE' has:" + + "\n\t\t- RELATIONSHIP_KEY (id)" + + "\nExpected constraints:" + + "\n\t- RELATIONSHIP_KEY (id, second_id)" + + "\nor:" + + "\n\t- RELATIONSHIP_UNIQUENESS (id, second_id)" + + "\n\t- RELATIONSHIP_PROPERTY_EXISTENCE (id)" + + "\n\t- RELATIONSHIP_PROPERTY_EXISTENCE (second_id)" + + warningMessages shouldBe listOf(relationship) + } + + @Test + fun `checkConstraints should return warning messages with existing uniqueness and existence constraints`() { + val handler = + RelationshipPatternHandler( + "my-topic", + "(:LabelA{!idStart})-[:REL_TYPE{!id, !second_id}]->(:LabelB{!idEnd})", + mergeNodeProperties = true, + mergeRelationshipProperties = true, + renderer = Renderer.getDefaultRenderer(), + batchSize = 1) + + val constraints = + listOf( + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_KEY.value, + labelOrType = "LabelA", + properties = listOf("idStart")), + ConstraintData( + entityType = ConstraintEntityType.RELATIONSHIP.value, + constraintType = ConstraintType.RELATIONSHIP_UNIQUENESS.value, + labelOrType = "REL_TYPE", + properties = listOf("id", "second_id")), + ConstraintData( + entityType = ConstraintEntityType.RELATIONSHIP.value, + constraintType = ConstraintType.RELATIONSHIP_EXISTENCE.value, + labelOrType = "REL_TYPE", + properties = listOf("id")), + ConstraintData( + entityType = ConstraintEntityType.NODE.value, + constraintType = ConstraintType.NODE_KEY.value, + labelOrType = "LabelB", + properties = listOf("idEnd"))) + + val warningMessages = handler.checkConstraints(constraints) + + val relationship = + "Relationship 'REL_TYPE' does not match the key(s) defined by the pattern (:LabelA{!idStart})-[:REL_TYPE{!id, !second_id}]->(:LabelB{!idEnd})." + + "\nPlease fix the relationship constraints:" + + "\n\t'REL_TYPE' has:" + + "\n\t\t- RELATIONSHIP_UNIQUENESS (id, second_id)" + + "\n\t\t- RELATIONSHIP_PROPERTY_EXISTENCE (id)" + + "\nExpected constraints:" + + "\n\t- RELATIONSHIP_KEY (id, second_id)" + + "\nor:" + + "\n\t- RELATIONSHIP_UNIQUENESS (id, second_id)" + + "\n\t- RELATIONSHIP_PROPERTY_EXISTENCE (id)" + + "\n\t- RELATIONSHIP_PROPERTY_EXISTENCE (second_id)" + + warningMessages shouldBe listOf(relationship) + } + private fun assertQueryAndParameters( pattern: String, keySchema: Schema = Schema.STRING_SCHEMA,