Skip to content

Commit

Permalink
command definition changed: 'set' term needs to be added
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffreyvanhelden committed Jun 1, 2020
1 parent 133ed8c commit c3d29e6
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 77 deletions.
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@ props event.identifier BOUNDS event.southwestlatitude, event.southwestlongitude,
Specified event fields that do not match any topic value field name result in invalid commands. This will cause runtime errors. A few hints:
- Referring to nested fields is possible using the dot notation, as in *event.nest.my-field*
- Only value fields are permitted.
- The SET word is to be ommitted in the command.
- *SET fleet truck1 POINT 33.5123 -112.2693* is specified as *fleet truck1 POINT 33.5123 -112.2693*
- *SET fleet event.id POINT event.lat event.lon* is specified as *fleet event.id POINT event.lat event.lon*
- Use of the DEL word as the first term is not allowed, as it corresponds with the DEL command.
- Using anything other than a SET command is not supported at this stage.


### Tombstone messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ public class Constants {
private static final String SEPARATOR = ".";
public static final String TOKERATOR = TOKEN + SEPARATOR;

public static final String SET_RESERVED = "SET";
public static final String DEL_RESERVED = "DEL";
public static final String SET_TERM = "SET";

public static final String COMMAND_PREFIX = "tile38.topic.";
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package guru.bonacci.kafka.connect.tile38.commands;

import static com.google.common.collect.ImmutableList.copyOf;
import static com.google.common.collect.ImmutableList.of;
import static com.google.common.collect.Sets.newHashSet;
import static guru.bonacci.kafka.connect.tile38.Constants.DEL_RESERVED;
import static guru.bonacci.kafka.connect.tile38.Constants.SET_RESERVED;
import static guru.bonacci.kafka.connect.tile38.Constants.SET_TERM;
import static guru.bonacci.kafka.connect.tile38.Constants.TOKERATOR;
import static lombok.AccessLevel.PRIVATE;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.strip;

import java.util.List;
import java.util.Set;

import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -47,32 +43,32 @@ public class CommandTemplate {
* SET key id [FIELD name value ...] [EX seconds] [NX|XX] (OBJECT
* geojson)|(POINT lat lon [z])|(BOUNDS minlat minlon maxlat maxlon)|(HASH
* geohash)|(STRING value)
*
* 'Compile time' verification only on the first term, it being a valid key.
* Otherwise invalid commands arise 'run time'.
*/
public static CommandTemplate from(String cmdString) {
if (isBlank(cmdString)) {
throw new ConfigException("Command cannot be blank");
throw new ConfigException("Command cannot be empty");
}

// remove excessive spaces and strip the SET term from the command string
final String[] setAndCmdString = strip(cmdString.replaceAll("[ ]+", " ")).split(" ", 2);

String trimmedCmdString = strip(cmdString);
// List to keep order
List<String> cmdTerms = copyOf(trimmedCmdString.split(" "));
if (!SET_TERM.equalsIgnoreCase(setAndCmdString[0]))
throw new ConfigException(String.format("Only SET commands are supported. Configured command '%s' starts with '%s'", cmdString, setAndCmdString[0]));

if (setAndCmdString.length < 2)
throw new ConfigException(String.format("No key defined in command '%s'", cmdString));

String key = cmdTerms.get(0);
if (key.startsWith(TOKERATOR))
throw new ConfigException("Command {} cannot start with 'event.'. The first command word is the key. Check the docs: {}",
cmdString, "https://tile38.com/commands/set/");
final String cmdStringWithoutSet = setAndCmdString[1];

if (of(SET_RESERVED, DEL_RESERVED).contains(key.toUpperCase()))
throw new ConfigException("SET and DEL are reserved words and can be ommitted. Command {} cannot start with either one.", cmdString);

// no more need for order
Set<String> terms = newHashSet(cmdTerms);
// strip the key from the command string
final String[] keyAndCmdString = cmdStringWithoutSet.split(" ", 2);
if (keyAndCmdString.length < 2)
throw new ConfigException(String.format("No id defined in command '%s'", cmdString));

final Set<String> terms = newHashSet(cmdStringWithoutSet.split(" "));
// remove all command terms that do not start with 'event.'
terms.removeIf(s -> !s.startsWith(TOKERATOR));

return new CommandTemplate(trimmedCmdString, key, terms);
return new CommandTemplate(cmdStringWithoutSet, keyAndCmdString[0], terms);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ public void emptyPut() {
static Stream<Arguments> provideForInvalidCommandTemplateStartup() {
return Stream.of(
Arguments.of("event.one event.two event.three"),
Arguments.of("set first is not allowed")
Arguments.of(" set "),
Arguments.of("set nokey")
);
}

Expand Down Expand Up @@ -185,7 +186,7 @@ public void nestedWrite() {
final String topic = "foo";

Map<String, String> config = Maps.newHashMap(provideConfig(topic));
config.put("tile38.topic.foo", "foo event.id POINT event.nested.lat event.nested.lon");
config.put("tile38.topic.foo", "SET foo event.id POINT event.nested.lat event.nested.lon");
this.task.start(config);

final String id = "fooid";
Expand Down Expand Up @@ -237,19 +238,19 @@ public void IgnoredFieldWrite(String route, String lat, String lon) {

private static Stream<Arguments> provideForInvalidWriteCommands() {
return Stream.of(
Arguments.of("foo event.one FIELD route event.two POINT event.three event.four", "fooid", "12.3", "string", "no float"),
Arguments.of("foo event.one FIELD route event.two POINT event.three event.four", "fooid", "null", "0.1", "1.0"),
Arguments.of("foo event.one FIELD route event.two POINT event.three event.four", "fooid", "not a float", "3.1", "4.1"),
Arguments.of("foo event.one FIELD route event.two POINT event.three event.four", "fooid", "nothing", "0.1", "1.0"),
Arguments.of("foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1f", "3.1", "4.1"),
Arguments.of("foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1", "3.1f", "4.1"),
Arguments.of("foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1F", "3.1", "4.1"),
Arguments.of("foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1", "3.1", "4.1F"),
Arguments.of("foo event.one FIELD route event.two POINT event.three event.four", "fooid", "event.route", "event.lat", "event.lon"),
Arguments.of("bar event.one FIELD POINT event.two event.three", "123", "1.2", "2.3", null),
Arguments.of("bar event.one FIELD POINT event.two event.three", "null", "%%", "@@", null),
Arguments.of("bar event.one FIELD POINT event.two event.three", "$$", "1.2", "2.3", null),
Arguments.of("bar event.one FIELD event.two event.three", "100", "1.2", "2.3", null)
Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "12.3", "string", "no float"),
Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "null", "0.1", "1.0"),
Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "not a float", "3.1", "4.1"),
Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "nothing", "0.1", "1.0"),
Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1f", "3.1", "4.1"),
Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1", "3.1f", "4.1"),
Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1F", "3.1", "4.1"),
Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "1", "3.1", "4.1F"),
Arguments.of("SET foo event.one FIELD route event.two POINT event.three event.four", "fooid", "event.route", "event.lat", "event.lon"),
Arguments.of("SET bar event.one FIELD POINT event.two event.three", "123", "1.2", "2.3", null),
Arguments.of("SET bar event.one FIELD POINT event.two event.three", "null", "%%", "@@", null),
Arguments.of("SET bar event.one FIELD POINT event.two event.three", "$$", "1.2", "2.3", null),
Arguments.of("SET bar event.one FIELD event.two event.three", "100", "1.2", "2.3", null)
);
}

Expand Down Expand Up @@ -281,7 +282,7 @@ private Map<String, String> provideConfig(String topic) {
return ImmutableMap.of(SinkTask.TOPICS_CONFIG, topic,
Tile38SinkConnectorConfig.TILE38_HOST, host,
Tile38SinkConnectorConfig.TILE38_PORT, port,
"tile38.topic.foo", "foo event.id FIELD route event.route POINT event.lat event.lon");
"tile38.topic.foo", "SET foo event.id FIELD route event.route POINT event.lat event.lon");
}

private Schema getRouteSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class CommandGeneratorTests {

@Test
void preparedStatement() {
final String cmdString = "something event.id is to be sub event.sub and event.foo event.nest.ed";
final String cmdString = "SET something event.id is to be sub event.sub and event.foo event.nest.ed";

JsonObject sinkRecord = new JsonObject();
sinkRecord.addProperty("id", "fooid");
Expand All @@ -49,7 +49,7 @@ void preparedStatement() {

@Test
void prepareInvalidStatements() {
final String cmdString = "thekey event.four event.one FIELD POINT event.two event.three";
final String cmdString = "SET thekey event.four event.one FIELD POINT event.two event.three";

JsonObject sinkRecord = new JsonObject();
sinkRecord.addProperty("one", "null");
Expand All @@ -66,7 +66,7 @@ void prepareInvalidStatements() {

@Test
void compileToSET() {
final String cmdString = "bla event.id is to be sub nest.event.foo and nest.event.bar more";
final String cmdString = "Set bla event.id is to be sub nest.event.foo and nest.event.bar more";

Schema nestedSchema = SchemaBuilder.struct()
.field("foo", Schema.STRING_SCHEMA)
Expand All @@ -90,7 +90,7 @@ void compileToSET() {

@Test
void tombstoneToDELETE() {
final String cmdString = "bla event.id is to be sub nest.event.foo and nest.event.bar more";
final String cmdString = "SET bla event.id is to be sub nest.event.foo and nest.event.bar more";

SinkRecord rec = new SinkRecord(
"unused",
Expand All @@ -116,7 +116,7 @@ void tombstoneToDELETE() {

@Test
void missingField() {
final String cmdString = "qqqq event.id is to be event.sub";
final String cmdString = "set qqqq event.id is to be event.sub";

Schema schema = SchemaBuilder.struct().field("id", Schema.STRING_SCHEMA);
Struct value = new Struct(schema).put("id", "some id");
Expand All @@ -133,7 +133,7 @@ void missingField() {

@Test
void nesting() {
final String cmdString = "foo event.id POINT event.nested.foo event.nested.bar";
final String cmdString = "seT foo event.id POINT event.nested.foo event.nested.bar";

JsonObject nestedRecord = new JsonObject();
nestedRecord.addProperty("foo", "some foo");
Expand All @@ -152,7 +152,7 @@ void nesting() {

@Test
void compileWithRepeatingTermNames() {
final String cmdString = "foo event.bar POINT event.bar1 event.bar2";
final String cmdString = "set foo event.bar POINT event.bar1 event.bar2";

Schema schema = SchemaBuilder.struct().field("bar", Schema.STRING_SCHEMA).field("bar1", Schema.FLOAT32_SCHEMA).field("bar2", Schema.FLOAT32_SCHEMA);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,33 @@

import com.google.common.collect.ImmutableSet;

public class CommandWrapperTests {
public class CommandTemplateTests {

@Test
void startWithEvent() {
final String cmdString = "event.id is to be sub event.sub and event.foo event.nest.ed";
void normal() {
final String cmdString = "SET foo event.id to be sub event.sub and event.foo event.nest.ed";

Assertions.assertThrows(ConfigException.class, () -> {
CommandTemplate.from(cmdString);
});
CommandTemplate cmd = CommandTemplate.from(cmdString);

assertThat(cmd.getCmdString(), is(equalTo("foo event.id to be sub event.sub and event.foo event.nest.ed")));
assertThat(cmd.getKey(), is(equalTo("foo")));
assertThat(cmd.getTerms(), is(equalTo(ImmutableSet.of("event.foo", "event.id", "event.sub", "event.nest.ed"))));
}

@Test
void trim() {
final String spacedCmdString = " SET foo is to be sub event.sub and event.foo event.nest.ed ";
final String cmdString = "SET foo is to be sub event.sub and event.foo event.nest.ed";

CommandTemplate spacedCmd = CommandTemplate.from(spacedCmdString);
CommandTemplate cmd = CommandTemplate.from(cmdString);

assertThat(spacedCmd, is(equalTo(cmd)));
}

@Test
void startTrimmedWithEvent() {
final String cmdString = " event.id is to be sub event.sub and event.foo event.nest.ed";
void noStartWithSet() {
final String cmdString = "event.id is to be sub event.sub and event.foo event.nest.ed";

Assertions.assertThrows(ConfigException.class, () -> {
CommandTemplate.from(cmdString);
Expand All @@ -38,26 +51,31 @@ void emptyCmdString() {
CommandTemplate.from(cmdString);
});
}

@Test
void trimming() {
final String spacedCmdString = " foo is to be sub event.sub and event.foo event.nest.ed ";
final String cmdString = "foo is to be sub event.sub and event.foo event.nest.ed";

CommandTemplate spacedCmd = CommandTemplate.from(spacedCmdString);
CommandTemplate cmd = CommandTemplate.from(cmdString);
@Test
void onlySet() {
final String cmdString = "SET";

assertThat(spacedCmd, is(equalTo(cmd)));
Assertions.assertThrows(ConfigException.class, () -> {
CommandTemplate.from(cmdString);
});
}

@Test
void normal() {
final String cmdString = "foo event.id to be sub event.sub and event.foo event.nest.ed";
void onlySetAndKey() {
final String cmdString = "SET foo";

CommandTemplate cmd = CommandTemplate.from(cmdString);
Assertions.assertThrows(ConfigException.class, () -> {
CommandTemplate.from(cmdString);
});
}

@Test
void setGlued() {
final String cmdString = "SETfoo event.id is to be sub event.sub and event.foo event.nest.ed";

assertThat(cmd.getCmdString(), is(equalTo(cmdString)));
assertThat(cmd.getKey(), is(equalTo("foo")));
assertThat(cmd.getTerms(), is(equalTo(ImmutableSet.of("event.foo", "event.id", "event.sub", "event.nest.ed"))));
Assertions.assertThrows(ConfigException.class, () -> {
CommandTemplate.from(cmdString);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public static TopicsConfig provideTopics() {
"key.converter", "org.apache.kafka.connect.storage.StringConverter",
"value.converter", "org.apache.kafka.connect.storage.StringConverter",
"topics", "foo,bar",
"tile38.topic.foo", "foo event.query event.here",
"tile38.topic.bar", "bar event.bar query here event.there"));
"tile38.topic.foo", "SET foo event.query event.here",
"tile38.topic.bar", "set bar event.bar query here event.there"));
}

@Test
Expand All @@ -33,7 +33,7 @@ void filterByPrefix() {

Map<String, String> cmds = topics.getCmdsByTopic();
assertThat(cmds, is(aMapWithSize(2)));
assertThat(cmds.get("foo"), is("foo event.query event.here"));
assertThat(cmds.get("bar"), is("bar event.bar query here event.there"));
assertThat(cmds.get("foo"), is("SET foo event.query event.here"));
assertThat(cmds.get("bar"), is("set bar event.bar query here event.there"));
}
}

0 comments on commit c3d29e6

Please sign in to comment.