Skip to content

Commit

Permalink
feat: log warning messages when pattern keys and constraints do not m…
Browse files Browse the repository at this point in the history
…atch (#162)
  • Loading branch information
Emrehzl94 authored Jul 19, 2024
1 parent b6f72a7 commit c488bb1
Show file tree
Hide file tree
Showing 8 changed files with 874 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.data

import org.neo4j.driver.Driver
import org.neo4j.driver.SessionConfig

enum class ConstraintEntityType(val value: String) {
NODE("NODE"),
RELATIONSHIP("RELATIONSHIP"),
}

enum class ConstraintType(val value: String) {
NODE_KEY("NODE_KEY"),
NODE_UNIQUENESS("UNIQUENESS"),
NODE_EXISTENCE("NODE_PROPERTY_EXISTENCE"),
RELATIONSHIP_KEY("RELATIONSHIP_KEY"),
RELATIONSHIP_UNIQUENESS("RELATIONSHIP_UNIQUENESS"),
RELATIONSHIP_EXISTENCE("RELATIONSHIP_PROPERTY_EXISTENCE")
}

val NODE_CONSTRAINTS =
listOf(
ConstraintType.NODE_KEY.value,
ConstraintType.NODE_UNIQUENESS.value,
ConstraintType.NODE_EXISTENCE.value)

val RELATIONSHIP_CONSTRAINTS =
listOf(
ConstraintType.RELATIONSHIP_KEY.value,
ConstraintType.RELATIONSHIP_UNIQUENESS.value,
ConstraintType.RELATIONSHIP_EXISTENCE.value)

data class ConstraintData(
val entityType: String,
val constraintType: String,
val labelOrType: String,
val properties: List<String>
)

fun fetchConstraintData(driver: Driver, sessionConfig: SessionConfig): List<ConstraintData> {
return try {
driver.session(sessionConfig).use { session ->
session
.run("SHOW CONSTRAINTS YIELD entityType, type, labelsOrTypes, properties RETURN *")
.list()
.map {
ConstraintData(
entityType = it.get("entityType").asString(),
constraintType = it.get("type").asString(),
labelOrType = it.get("labelsOrTypes").asList()[0].toString(),
properties = it.get("properties").asList().map { property -> property.toString() })
}
}
} catch (e: Exception) {
emptyList()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.kafka.connect.sink.SinkRecord
import org.neo4j.connectors.kafka.data.DynamicTypes
import org.neo4j.connectors.kafka.data.cdcTxId
import org.neo4j.connectors.kafka.data.cdcTxSeq
import org.neo4j.connectors.kafka.data.fetchConstraintData
import org.neo4j.connectors.kafka.data.isCdcMessage
import org.neo4j.connectors.kafka.sink.strategy.CdcSchemaHandler
import org.neo4j.connectors.kafka.sink.strategy.CdcSourceIdHandler
Expand Down Expand Up @@ -164,6 +165,8 @@ interface SinkStrategyHandler {
bindHeaderAs = config.patternBindHeaderAs,
bindKeyAs = config.patternBindKeyAs,
bindValueAs = config.patternBindValueAs)

handler.validate(fetchConstraintData(config.driver, config.sessionConfig()))
}

val relationshipPattern =
Expand All @@ -187,6 +190,8 @@ interface SinkStrategyHandler {
bindHeaderAs = config.patternBindHeaderAs,
bindKeyAs = config.patternBindKeyAs,
bindValueAs = config.patternBindValueAs)

handler.validate(fetchConstraintData(config.driver, config.sessionConfig()))
}

val cdcSourceIdTopics = config.getList(SinkConfiguration.CDC_SOURCE_ID_TOPICS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
*/
package org.neo4j.connectors.kafka.sink.strategy

import org.neo4j.connectors.kafka.data.ConstraintData
import org.neo4j.connectors.kafka.sink.ChangeQuery
import org.neo4j.connectors.kafka.sink.SinkConfiguration
import org.neo4j.connectors.kafka.sink.SinkMessage
import org.neo4j.connectors.kafka.sink.SinkStrategy
import org.neo4j.connectors.kafka.sink.strategy.pattern.NodePattern
import org.neo4j.connectors.kafka.sink.strategy.pattern.Pattern
import org.neo4j.connectors.kafka.sink.strategy.pattern.PatternConstraintValidator
import org.neo4j.cypherdsl.core.Cypher
import org.neo4j.cypherdsl.core.Literal
import org.neo4j.cypherdsl.core.Node
Expand All @@ -33,7 +35,7 @@ import org.slf4j.LoggerFactory

class NodePatternHandler(
val topic: String,
patternString: String,
private val patternString: String,
private val mergeProperties: Boolean,
private val renderer: Renderer,
private val batchSize: Int,
Expand Down Expand Up @@ -104,6 +106,18 @@ class NodePatternHandler(
.toList()
}

fun validate(constraints: List<ConstraintData>) {
val warningMessages = checkConstraints(constraints)
warningMessages.forEach { logger.warn(it) }
}

override fun checkConstraints(constraints: List<ConstraintData>): List<String> {
val nodeWarning =
PatternConstraintValidator.checkNodeWarning(constraints, pattern, patternString)
?: return emptyList()
return listOf(nodeWarning)
}

private fun buildStatement(): String {
val createOperation = Cypher.literalOf<String>(CREATE)
val deleteOperation = Cypher.literalOf<String>(DELETE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.neo4j.connectors.kafka.sink.strategy
import java.time.Instant
import java.time.ZoneOffset
import org.apache.kafka.connect.errors.ConnectException
import org.neo4j.connectors.kafka.data.ConstraintData
import org.neo4j.connectors.kafka.exceptions.InvalidDataException
import org.neo4j.connectors.kafka.sink.SinkConfiguration
import org.neo4j.connectors.kafka.sink.SinkMessage
Expand Down Expand Up @@ -69,6 +70,8 @@ abstract class PatternHandler<T : Pattern>(
.flatten()
}

abstract fun checkConstraints(constraints: List<ConstraintData>): List<String>

/**
* Checks if given <strong>from</strong> key is explicitly defined, i.e. something starting with
* __key, __value or __header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package org.neo4j.connectors.kafka.sink.strategy

import org.neo4j.connectors.kafka.data.ConstraintData
import org.neo4j.connectors.kafka.sink.ChangeQuery
import org.neo4j.connectors.kafka.sink.SinkConfiguration
import org.neo4j.connectors.kafka.sink.SinkMessage
import org.neo4j.connectors.kafka.sink.SinkStrategy
import org.neo4j.connectors.kafka.sink.strategy.pattern.Pattern
import org.neo4j.connectors.kafka.sink.strategy.pattern.PatternConstraintValidator
import org.neo4j.connectors.kafka.sink.strategy.pattern.RelationshipPattern
import org.neo4j.cypherdsl.core.Cypher
import org.neo4j.cypherdsl.core.Literal
Expand All @@ -33,7 +35,7 @@ import org.slf4j.LoggerFactory

class RelationshipPatternHandler(
val topic: String,
patternString: String,
private val patternString: String,
private val mergeNodeProperties: Boolean,
private val mergeRelationshipProperties: Boolean,
private val renderer: Renderer,
Expand Down Expand Up @@ -121,6 +123,33 @@ class RelationshipPatternHandler(
.toList()
}

fun validate(constraints: List<ConstraintData>) {
val warningMessages = checkConstraints(constraints)
warningMessages.forEach { logger.warn(it) }
}

override fun checkConstraints(constraints: List<ConstraintData>): List<String> {
val warningMessages = mutableListOf<String>()

val startNodeWarning =
PatternConstraintValidator.checkNodeWarning(constraints, pattern.start, patternString)
val relationshipWarning =
PatternConstraintValidator.checkRelationshipWarning(constraints, pattern, patternString)
val endNodeWarning =
PatternConstraintValidator.checkNodeWarning(constraints, pattern.end, patternString)

if (startNodeWarning != null) {
warningMessages.add(startNodeWarning)
}
if (relationshipWarning != null) {
warningMessages.add(relationshipWarning)
}
if (endNodeWarning != null) {
warningMessages.add(endNodeWarning)
}
return warningMessages
}

private fun buildStatement(): String {
val createOperation = Cypher.literalOf<String>(CREATE)
val deleteOperation = Cypher.literalOf<String>(DELETE)
Expand Down
Loading

0 comments on commit c488bb1

Please sign in to comment.