From 025dde0e928235d012f2e56dec79709a65636581 Mon Sep 17 00:00:00 2001 From: jeffreyvanhelden Date: Sat, 30 May 2020 12:37:34 +1200 Subject: [PATCH] bug fix: repeating term field caused invalid command --- config/avro/connector-sink-avro.json | 11 ++-- config/avro/connector-source-avro.json | 9 ++-- config/connector-sink-string.json | 17 ------- config/invalid/connector-sink-invalid.json | 17 ------- config/invalid/connector-sink-string.json | 17 ------- config/invalid/connector-source-string.json | 18 ------- docker-compose.yml | 3 +- .../tile38/commands/CommandGenerator.java | 17 +++++-- src/main/resources/datagen/foo.avro | 50 +++++++++++++++++++ src/main/resources/{ => datagen}/station.avro | 0 src/main/resources/{ => datagen}/train.avro | 0 .../commands/CommandGeneratorTests.java | 19 +++++++ 12 files changed, 92 insertions(+), 86 deletions(-) delete mode 100644 config/connector-sink-string.json delete mode 100644 config/invalid/connector-sink-invalid.json delete mode 100644 config/invalid/connector-sink-string.json delete mode 100644 config/invalid/connector-source-string.json create mode 100644 src/main/resources/datagen/foo.avro rename src/main/resources/{ => datagen}/station.avro (100%) rename src/main/resources/{ => datagen}/train.avro (100%) diff --git a/config/avro/connector-sink-avro.json b/config/avro/connector-sink-avro.json index 1c19079..681360b 100644 --- a/config/avro/connector-sink-avro.json +++ b/config/avro/connector-sink-avro.json @@ -1,18 +1,17 @@ { - "name": "tile", + "name": "foo-avro-sink", "config": { + "tasks.max": "1", "connector.class": "guru.bonacci.kafka.connect.tile38.Tile38SinkConnector", - "topics": "test", + "topics": "foo", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", - "tile38.topic.test": "test event.id FIELD route event.route POINT event.lat event.lon", + "tile38.topic.foo": "foo event.bar POINT event.bar1 event.bar2", "tile38.host": "tile38", - "tile38.port": 9851, - - "tasks.max": "1" + "tile38.port": 9851 } } diff --git a/config/avro/connector-source-avro.json b/config/avro/connector-source-avro.json index 9c09783..ae62b0d 100644 --- a/config/avro/connector-source-avro.json +++ b/config/avro/connector-source-avro.json @@ -1,18 +1,17 @@ { - "name": "datagen-test", + "name": "foo-avro-source", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", - "kafka.topic": "test", + "kafka.topic": "foo", - "schema.filename": "/tmp/foobar.avro", - "schema.keyfield": "id", + "schema.filename": "/tmp/foo.avro", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "max.interval": 500, - "iterations": 1000000, + "iterations": 1000, "tasks.max": "1" } diff --git a/config/connector-sink-string.json b/config/connector-sink-string.json deleted file mode 100644 index 6fdee76..0000000 --- a/config/connector-sink-string.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "name": "tile", - "config": { - "connector.class": "guru.bonacci.kafka.connect.tile38.Tile38SinkConnector", - "topics": "asstring", - - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.storage.StringConverter", - - "tile38.topic.asstring": "asstring event.id FIELD route event.route POINT event.lat event.lon", - - "tile38.host": "tile38", - "tile38.port": 9851, - - "tasks.max": "1" - } -} diff --git a/config/invalid/connector-sink-invalid.json b/config/invalid/connector-sink-invalid.json deleted file mode 100644 index 64cd9f8..0000000 --- a/config/invalid/connector-sink-invalid.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "name": "tile", - "config": { - "connector.class": "guru.bonacci.kafka.connect.tile38.Tile38SinkConnector", - "topics": "foo", - - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - - "tile38.topic.foo": "I am invalid", - - "tile38.host": "tile38", - "tile38.port": 9851, - "tasks.max": "1" - } -} diff --git a/config/invalid/connector-sink-string.json b/config/invalid/connector-sink-string.json deleted file mode 100644 index 6fdee76..0000000 --- a/config/invalid/connector-sink-string.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "name": "tile", - "config": { - "connector.class": "guru.bonacci.kafka.connect.tile38.Tile38SinkConnector", - "topics": "asstring", - - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.storage.StringConverter", - - "tile38.topic.asstring": "asstring event.id FIELD route event.route POINT event.lat event.lon", - - "tile38.host": "tile38", - "tile38.port": 9851, - - "tasks.max": "1" - } -} diff --git a/config/invalid/connector-source-string.json b/config/invalid/connector-source-string.json deleted file mode 100644 index 3e12852..0000000 --- a/config/invalid/connector-source-string.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "name": "datagen-string", - "config": { - "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", - "kafka.topic": "asstring", - - "schema.filename": "/tmp/foobar.avro", - "schema.keyfield": "id", - - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.storage.StringConverter", - - "max.interval": 500, - "iterations": 1000000, - - "tasks.max": "1" - } -} diff --git a/docker-compose.yml b/docker-compose.yml index 2f1943f..1b97c58 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -88,8 +88,7 @@ services: CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR volumes: - ./target/kafka-connect-tile38-sink-1.0-SNAPSHOT-package/share/java/kafka-connect-tile38-sink:/usr/share/confluent-hub-components/kafka-connect-tile38-sink - - ./src/main/resources/station.avro:/tmp/station.avro - - ./src/main/resources/train.avro:/tmp/train.avro + - ./src/main/resources/datagen:/tmp links: - tile38 diff --git a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandGenerator.java b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandGenerator.java index b9d8a34..8e393b3 100644 --- a/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandGenerator.java +++ b/src/main/java/guru/bonacci/kafka/connect/tile38/commands/CommandGenerator.java @@ -1,5 +1,6 @@ package guru.bonacci.kafka.connect.tile38.commands; +import static org.apache.commons.lang3.StringUtils.strip; import static guru.bonacci.kafka.connect.tile38.Constants.TOKERATOR; import static io.lettuce.core.codec.StringCodec.UTF8; import static java.util.Arrays.asList; @@ -72,12 +73,15 @@ public class CommandGenerator { // visible for testing String preparedStatement(final Map record) { // determine for each command term its corresponding record value + log.trace("record {}", record); + final Map parsed = cmd.getTerms().stream() .collect(toMap(identity(), term -> { try { + log.debug("term {}", term); // field name is query term without 'event.' String prop = term.replace(TOKERATOR, ""); - + log.debug("prop {}", prop); // given the field name, retrieve the field value from the record Object val = PropertyUtils.getProperty(record, prop); @@ -85,6 +89,7 @@ String preparedStatement(final Map record) { // record does not contain required field throw new IllegalAccessException(); } + log.debug("val {}", val); return String.valueOf(val); } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { log.warn("Field mismatch command {}, and sink record {}", term, record); @@ -93,13 +98,17 @@ String preparedStatement(final Map record) { })); // build the command string by replacing the command terms with record values. - String generatedCmdString = cmd.getCmdString(); + // substitute 'event.x + space' to avoid eager substitution of similar term names, + // as in 'foo event.bar POINT event.bar1 event.bar2' + + // add a temporary space to command string to substitute last term + String generatedCmdString = cmd.getCmdString() + " "; for (Map.Entry entry : parsed.entrySet()) { // thereby escaping characters - generatedCmdString = generatedCmdString.replaceAll(entry.getKey(), quoteReplacement(entry.getValue())); + generatedCmdString = generatedCmdString.replaceAll(entry.getKey() + " ", quoteReplacement(entry.getValue() + " ")); } - return generatedCmdString; + return strip(generatedCmdString); } public static CommandGenerator from(CommandTemplate cmd) { diff --git a/src/main/resources/datagen/foo.avro b/src/main/resources/datagen/foo.avro new file mode 100644 index 0000000..0a4bcc9 --- /dev/null +++ b/src/main/resources/datagen/foo.avro @@ -0,0 +1,50 @@ +{ + "namespace": "demo", + "name": "foo", + "type": "record", + "fields": [ + { + "name": "bar", + "type": { + "type": "string", + "arg.properties": { + "options": [ + "one", + "two", + "three" + ] + } + } + }, + { + "name": "bar1", + "type": { + "type": "double", + "arg.properties": { + "options": [ + 12.34, + 23.45, + 34.56, + 45.67, + 56.78 + ] + } + } + }, + { + "name": "bar2", + "type": { + "type": "double", + "arg.properties": { + "options": [ + -98.76, + -87.65, + -76.54, + -65.43, + -54.32 + ] + } + } + } + ] +} diff --git a/src/main/resources/station.avro b/src/main/resources/datagen/station.avro similarity index 100% rename from src/main/resources/station.avro rename to src/main/resources/datagen/station.avro diff --git a/src/main/resources/train.avro b/src/main/resources/datagen/train.avro similarity index 100% rename from src/main/resources/train.avro rename to src/main/resources/datagen/train.avro diff --git a/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandGeneratorTests.java b/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandGeneratorTests.java index 9f3b913..917de97 100644 --- a/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandGeneratorTests.java +++ b/src/test/java/guru/bonacci/kafka/connect/tile38/commands/CommandGeneratorTests.java @@ -150,4 +150,23 @@ void nesting() { assertThat(result, is(equalTo("foo fooid POINT some foo some bar"))); } + @Test + void compileWithRepeatingTermNames() { + final String cmdString = "foo event.bar POINT event.bar1 event.bar2"; + + Schema schema = SchemaBuilder.struct().field("bar", Schema.STRING_SCHEMA).field("bar1", Schema.FLOAT32_SCHEMA).field("bar2", Schema.FLOAT32_SCHEMA); + + Struct value = new Struct(schema).put("bar", "one").put("bar1", 12.34f).put("bar2", 56.78f); + + SinkRecord rec = write("unused", Schema.STRING_SCHEMA, "foo", schema, value); + + Tile38Record internalRecord = new RecordConverter().convert(rec); + + Triple, CommandArgs> result = CommandGenerator.from( + CommandTemplate.from(cmdString)).compile(internalRecord); + + assertThat(result.getLeft(), is(equalTo(CommandType.SET))); + assertThat(result.getRight().toCommandString(), is(equalTo("foo one POINT 12.34 56.78"))); + } + }