Skip to content

Commit

Permalink
lettuce connection settings
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffreyvanhelden committed Jun 4, 2020
1 parent bba0c58 commit fbea0a2
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 4 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ flush.timeout.ms | Used for periodic flushing | int | 10000 | low | 1234
behavior.on.error | Error handling behavior | string | FAIL | medium | LOG or FAIL
tile38.topic.foo | Example command for 'foo' topic | string | | low | foo event.id FIELD route event.route POINT event.lat event.lon
tile38.topic.bar | Example command for 'bar' topic | string | | low | anything event.the_key POINT event.latitude event.longitude
and | some | more | lettuce | connection | settings |
socket.tcp.no.delay.enabled | Use TCP-no-delay
socket.keep.alive.enabled | Enable keepalive
socket.connect.timeout.ms | Wait ms before socket timeout.
request.queue.size | Max number of queued requests
auto.reconnect.enabled | Redis client automatic reconnect

# Build and run info

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import guru.bonacci.kafka.connect.tile38.commands.CommandTemplates;
import guru.bonacci.kafka.connect.tile38.validators.BehaviorOnErrorValues;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.SocketOptions;
import lombok.Getter;


Expand All @@ -34,7 +36,23 @@ public class Tile38SinkConnectorConfig extends AbstractConfig {

public static final String BEHAVIOR_ON_ERROR = "behavior.on.error";
private static final String BEHAVIOR_ON_ERROR_DOC = "Error handling behavior setting. Valid options are 'LOG' and 'FAIL'.";


public static final String SOCKET_TCP_NO_DELAY = "socket.tcp.no.delay.enabled";
private static final String SOCKET_TCP_NO_DELAY_DOC = "Use TCP-no-delay.";

public static final String SOCKET_KEEP_ALIVE = "socket.keep.alive.enabled";
private static final String SOCKET_KEEP_ALIVE_DOC = "Enable keepalive.";

public static final String SOCKET_CONNECT_TIMEOUT = "socket.connect.timeout.ms";
private static final String SOCKET_CONNECT_TIMEOUT_DOC = "Wait ms before socket timeout.";

public static final String REQUEST_QUEUE_SIZE = "request.queue.size";
private static final String REQUEST_QUEUE_SIZE_DOC = "Max number of queued requests.";

public static final String AUTO_RECONNECT = "auto.reconnect.enabled";
private static final String AUTO_RECONNECT_DOC = "Lets the Redis client reconnect automatically.";


@Getter TopicsConfig topicsConfig;
@Getter CommandTemplates cmdTemplates;

Expand Down Expand Up @@ -74,9 +92,14 @@ public static ConfigDef conf() {
.define(TILE38_HOST, Type.STRING, "localhost", Importance.HIGH, TILE38_HOST_DOC)
.define(TILE38_PORT, Type.INT, 9851, Importance.HIGH, TILE38_PORT_DOC)
.define(BEHAVIOR_ON_ERROR, Type.STRING, BehaviorOnErrorValues.DEFAULT.toString(), BehaviorOnErrorValues.VALIDATOR, Importance.MEDIUM, BEHAVIOR_ON_ERROR_DOC)
.define(FLUSH_TIMEOUT, Type.INT, 10000, Importance.LOW, FLUSH_TIMEOUT_DOC);
.define(FLUSH_TIMEOUT, Type.INT, 10000, Importance.LOW, FLUSH_TIMEOUT_DOC)
.define(SOCKET_TCP_NO_DELAY, Type.BOOLEAN, SocketOptions.DEFAULT_SO_NO_DELAY, Importance.LOW, SOCKET_TCP_NO_DELAY_DOC)
.define(SOCKET_KEEP_ALIVE, Type.BOOLEAN, SocketOptions.DEFAULT_SO_KEEPALIVE, Importance.LOW, SOCKET_KEEP_ALIVE_DOC)
.define(SOCKET_CONNECT_TIMEOUT, Type.LONG, SocketOptions.DEFAULT_CONNECT_TIMEOUT_DURATION.toMillis(), Importance.LOW, SOCKET_CONNECT_TIMEOUT_DOC)
.define(REQUEST_QUEUE_SIZE, Type.INT, ClientOptions.DEFAULT_REQUEST_QUEUE_SIZE, Importance.LOW, REQUEST_QUEUE_SIZE_DOC)
.define(AUTO_RECONNECT, Type.BOOLEAN, ClientOptions.DEFAULT_AUTO_RECONNECT, Importance.LOW, AUTO_RECONNECT_DOC);
}

public String getHost() {
return this.getString(TILE38_HOST);
}
Expand All @@ -88,6 +111,26 @@ public Integer getPort() {
public Integer getFlushTimeOut() {
return this.getInt(FLUSH_TIMEOUT);
}

public Boolean getTcpNoDelay() {
return this.getBoolean(SOCKET_TCP_NO_DELAY);
}

public Boolean getKeepAliveEnabled() {
return this.getBoolean(SOCKET_KEEP_ALIVE);
}

public Long getConnectTimeout() {
return this.getLong(SOCKET_CONNECT_TIMEOUT);
}

public Integer getRequestQueueSize() {
return this.getInt(REQUEST_QUEUE_SIZE);
}

public Boolean getAutoReconnectEnabled() {
return this.getBoolean(AUTO_RECONNECT);
}

public BehaviorOnErrorValues getBehaviorOnError() {
return BehaviorOnErrorValues.forValue(this.getString(BEHAVIOR_ON_ERROR));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;

import java.time.Duration;
import java.util.stream.Stream;

import guru.bonacci.kafka.connect.tile38.commands.CommandGenerators;
import guru.bonacci.kafka.connect.tile38.commands.CommandTemplates;
import guru.bonacci.kafka.connect.tile38.config.Tile38SinkConnectorConfig;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.api.async.RedisAsyncCommands;
import lombok.Getter;

Expand All @@ -22,10 +25,22 @@ public class Tile38Writer {
private final CommandGenerators cmds;

public Tile38Writer(Tile38SinkConnectorConfig config) {
final SocketOptions socketOptions = SocketOptions.builder()
.tcpNoDelay(config.getTcpNoDelay())
.connectTimeout(Duration.ofMillis(config.getConnectTimeout()))
.keepAlive(config.getKeepAliveEnabled())
.build();

final ClientOptions.Builder clientOptions = ClientOptions.builder()
.socketOptions(socketOptions)
.requestQueueSize(config.getRequestQueueSize())
.autoReconnect(config.getAutoReconnectEnabled());

this.client = RedisClient.create(
String.format("redis://%s:%d", config.getHost(), config.getPort()));
this.client.setOptions(clientOptions.build());

// disable auto-flushing to allow for batch inserts
// disable auto-flushing to allow for batch inserts
this.async = client.connect().async();
this.async.setAutoFlushCommands(false);

Expand Down

0 comments on commit fbea0a2

Please sign in to comment.