Skip to content

Commit

Permalink
bug fix: repeating term field caused invalid command
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffreyvanhelden committed May 30, 2020
1 parent f21688c commit 025dde0
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 86 deletions.
11 changes: 5 additions & 6 deletions config/avro/connector-sink-avro.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
{
"name": "tile",
"name": "foo-avro-sink",
"config": {
"tasks.max": "1",
"connector.class": "guru.bonacci.kafka.connect.tile38.Tile38SinkConnector",
"topics": "test",
"topics": "foo",

"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

"tile38.topic.test": "test event.id FIELD route event.route POINT event.lat event.lon",
"tile38.topic.foo": "foo event.bar POINT event.bar1 event.bar2",

"tile38.host": "tile38",
"tile38.port": 9851,

"tasks.max": "1"
"tile38.port": 9851
}
}
9 changes: 4 additions & 5 deletions config/avro/connector-source-avro.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
{
"name": "datagen-test",
"name": "foo-avro-source",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "test",
"kafka.topic": "foo",

"schema.filename": "/tmp/foobar.avro",
"schema.keyfield": "id",
"schema.filename": "/tmp/foo.avro",

"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

"max.interval": 500,
"iterations": 1000000,
"iterations": 1000,

"tasks.max": "1"
}
Expand Down
17 changes: 0 additions & 17 deletions config/connector-sink-string.json

This file was deleted.

17 changes: 0 additions & 17 deletions config/invalid/connector-sink-invalid.json

This file was deleted.

17 changes: 0 additions & 17 deletions config/invalid/connector-sink-string.json

This file was deleted.

18 changes: 0 additions & 18 deletions config/invalid/connector-source-string.json

This file was deleted.

3 changes: 1 addition & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ services:
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
volumes:
- ./target/kafka-connect-tile38-sink-1.0-SNAPSHOT-package/share/java/kafka-connect-tile38-sink:/usr/share/confluent-hub-components/kafka-connect-tile38-sink
- ./src/main/resources/station.avro:/tmp/station.avro
- ./src/main/resources/train.avro:/tmp/train.avro
- ./src/main/resources/datagen:/tmp

links:
- tile38
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package guru.bonacci.kafka.connect.tile38.commands;

import static org.apache.commons.lang3.StringUtils.strip;
import static guru.bonacci.kafka.connect.tile38.Constants.TOKERATOR;
import static io.lettuce.core.codec.StringCodec.UTF8;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -72,19 +73,23 @@ public class CommandGenerator {
// visible for testing
String preparedStatement(final Map<String, Object> record) {
// determine for each command term its corresponding record value
log.trace("record {}", record);

final Map<String, String> parsed = cmd.getTerms().stream()
.collect(toMap(identity(), term -> {
try {
log.debug("term {}", term);
// field name is query term without 'event.'
String prop = term.replace(TOKERATOR, "");

log.debug("prop {}", prop);
// given the field name, retrieve the field value from the record
Object val = PropertyUtils.getProperty(record, prop);

if (val == null) {
// record does not contain required field
throw new IllegalAccessException();
}
log.debug("val {}", val);
return String.valueOf(val);
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
log.warn("Field mismatch command {}, and sink record {}", term, record);
Expand All @@ -93,13 +98,17 @@ String preparedStatement(final Map<String, Object> record) {
}));

// build the command string by replacing the command terms with record values.
String generatedCmdString = cmd.getCmdString();
// substitute 'event.x + space' to avoid eager substitution of similar term names,
// as in 'foo event.bar POINT event.bar1 event.bar2'

// add a temporary space to command string to substitute last term
String generatedCmdString = cmd.getCmdString() + " ";
for (Map.Entry<String, String> entry : parsed.entrySet()) {
// thereby escaping characters
generatedCmdString = generatedCmdString.replaceAll(entry.getKey(), quoteReplacement(entry.getValue()));
generatedCmdString = generatedCmdString.replaceAll(entry.getKey() + " ", quoteReplacement(entry.getValue() + " "));
}

return generatedCmdString;
return strip(generatedCmdString);
}

public static CommandGenerator from(CommandTemplate cmd) {
Expand Down
50 changes: 50 additions & 0 deletions src/main/resources/datagen/foo.avro
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"namespace": "demo",
"name": "foo",
"type": "record",
"fields": [
{
"name": "bar",
"type": {
"type": "string",
"arg.properties": {
"options": [
"one",
"two",
"three"
]
}
}
},
{
"name": "bar1",
"type": {
"type": "double",
"arg.properties": {
"options": [
12.34,
23.45,
34.56,
45.67,
56.78
]
}
}
},
{
"name": "bar2",
"type": {
"type": "double",
"arg.properties": {
"options": [
-98.76,
-87.65,
-76.54,
-65.43,
-54.32
]
}
}
}
]
}
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,23 @@ void nesting() {
assertThat(result, is(equalTo("foo fooid POINT some foo some bar")));
}

@Test
void compileWithRepeatingTermNames() {
final String cmdString = "foo event.bar POINT event.bar1 event.bar2";

Schema schema = SchemaBuilder.struct().field("bar", Schema.STRING_SCHEMA).field("bar1", Schema.FLOAT32_SCHEMA).field("bar2", Schema.FLOAT32_SCHEMA);

Struct value = new Struct(schema).put("bar", "one").put("bar1", 12.34f).put("bar2", 56.78f);

SinkRecord rec = write("unused", Schema.STRING_SCHEMA, "foo", schema, value);

Tile38Record internalRecord = new RecordConverter().convert(rec);

Triple<CommandType, CommandOutput<String, String, ?>, CommandArgs<String, String>> result = CommandGenerator.from(
CommandTemplate.from(cmdString)).compile(internalRecord);

assertThat(result.getLeft(), is(equalTo(CommandType.SET)));
assertThat(result.getRight().toCommandString(), is(equalTo("foo one POINT 12.34 56.78")));
}

}

0 comments on commit 025dde0

Please sign in to comment.