Skip to content

Commit

Permalink
Merge pull request #34 from tokensmith/release-1.2.2
Browse files Browse the repository at this point in the history
Release 1.2.2
  • Loading branch information
tmackenzie authored Aug 1, 2020
2 parents 675b918 + a3176af commit 5a51cf4
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 29 deletions.
27 changes: 25 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,31 @@ export MESSAGE_QUEUE_HOST='localhost:9092'

### Injection
```java
PelicanAppConfig appConfig = new PelicanAppConfig();
appConfig.setMessageQueueHost("localhost:9092");
PelicanAppConfig appConfig = new PelicanAppConfig();
appConfig.setMessageQueueHost("localhost:9092");
```

### Change publish and subscribe settings
In order to change the subscribe and publish settings implement a local `PelicanAppConfig` and override the methods,
`propertiesForSubscribe`, `propertiesForPublish`. Then use the local implementation to construct publishers and subscribers.

```java
public class PubSubConfig extends PelicanAppConfig {

@Override
public Properties propertiesForSubscribe(String clientId, String consumerGroup) {
Properties properties = super.propertiesForSubscribe(clientId, consumerGroup);
properties.put("my.new.prop.key", "my.new.prop.value");
return properties;
}

@Override
public Properties propertiesForPublish(String clientId) {
Properties properties = super.propertiesForPublish(clientId);
properties.put("my.new.prop.key", "my.new.prop.value");
return properties;
}
}
```

How to Publish
Expand Down
18 changes: 9 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ plugins {
sourceCompatibility = 12
targetCompatibility = 12

version = '1.2.1'
version = '1.2.2'
group = 'net.tokensmith'
description = 'framework to publish and consume messages'

ext{
kafkaVersion = '2.5.0'
jacksonVersion = '2.11.1'
log4jVersion = '2.12.1'
junitVersion = '5.0.0'
slf4jVersion = '1.7.26'
log4jVersion = '2.13.3'
slf4jVersion = '1.7.25'
}

repositories {
Expand All @@ -38,14 +38,14 @@ dependencies {
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:${jacksonVersion}"
compile "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
compile "com.fasterxml.jackson.module:jackson-module-afterburner:${jacksonVersion}"

compile "org.slf4j:slf4j-simple:${slf4jVersion}"
compile "org.apache.logging.log4j:log4j-1.2-api:${log4jVersion}"
compile "org.apache.logging.log4j:log4j-api:${log4jVersion}"
compile "org.apache.logging.log4j:log4j-core:${log4jVersion}"
compile "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}"

compile group: 'org.slf4j', name: 'slf4j-api', version: "${slf4jVersion}"

testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: "${log4jVersion}"
testCompile group: 'org.apache.logging.log4j', name: 'log4j-api', version: "${log4jVersion}"
testCompile group: 'org.apache.logging.log4j', name: 'log4j-core', version: "${log4jVersion}"

deployerJars "org.apache.maven.wagon:wagon-http:2.2"
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/net/tokensmith/pelican/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@

public interface Publish {
void send(String topic, Map<String, String> msg);
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import net.tokensmith.pelican.kafka.KafkaPublish;
import net.tokensmith.pelican.kafka.KafkaSubscribe;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.Collection;
import java.util.Properties;
Expand Down Expand Up @@ -82,7 +83,11 @@ public Properties propertiesForSubscribe(String clientId, String consumerGroup)
return props;
}
public Publish publish(String clientId) {
return new KafkaPublish(propertiesForPublish(clientId), objectMapper());
Properties props = propertiesForPublish(clientId);
return new KafkaPublish(
objectMapper(),
new KafkaProducer<>(props)
);
}

public KafkaConsumer<String, String> consumer(Collection<String> topics, String clientId, String consumerGroup){
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/net/tokensmith/pelican/kafka/KafkaPublish.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.tokensmith.pelican.Publish;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.Map;
import java.util.Properties;

public class KafkaPublish implements Publish {
protected static Logger logger = LogManager.getLogger(KafkaPublish.class);
protected static Logger LOGGER = LoggerFactory.getLogger(KafkaPublish.class);

Properties properties;
ObjectMapper objectMapper;
private Producer<String, byte[]> producer;
private ObjectMapper objectMapper;

public KafkaPublish(Properties properties, ObjectMapper objectMapper) {
this.properties = properties;
public KafkaPublish(ObjectMapper objectMapper, Producer<String, byte[]> producer) {
this.objectMapper = objectMapper;
this.producer = producer;
}

@Override
Expand All @@ -30,14 +30,14 @@ public void send(String topic, Map<String, String> msg) {
try {
payload = objectMapper.writeValueAsBytes(msg);
} catch (JsonProcessingException e) {
logger.error(e.getMessage(), e);
LOGGER.error(e.getMessage(), e);
}

Producer<String, byte[]> producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<>(topic, payload));
logger.debug("sent message");
LOGGER.debug("sent message");
}

@Override
public void close() {
producer.close();
logger.debug("closed connection");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.io.IOException;
import java.time.Duration;
Expand All @@ -19,7 +20,7 @@


public class KafkaSubscribe implements Subscribe {
protected static Logger LOGGER = LogManager.getLogger(KafkaSubscribe.class);
protected static Logger LOGGER = LoggerFactory.getLogger(KafkaSubscribe.class);

private KafkaConsumer<String, String> consumer;
private ObjectMapper objectMapper;
Expand Down
22 changes: 22 additions & 0 deletions src/test/java/helper/PubSubConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package helper;

import net.tokensmith.pelican.config.PelicanAppConfig;

import java.util.Properties;

public class PubSubConfig extends PelicanAppConfig {

@Override
public Properties propertiesForSubscribe(String clientId, String consumerGroup) {
Properties properties = super.propertiesForSubscribe(clientId, consumerGroup);
properties.put("my.new.prop.key", "my.new.prop.value");
return properties;
}

@Override
public Properties propertiesForPublish(String clientId) {
Properties properties = super.propertiesForPublish(clientId);
properties.put("my.new.prop.key", "my.new.prop.value");
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package net.tokensmith.pelican.config;


import helper.PubSubConfig;
import org.junit.Test;

import java.util.Properties;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

public class PelicanAppConfigTest {

public PelicanAppConfig subject() {
return new PelicanAppConfig();
}

@Test
public void setMessageQueueHostShouldAssign() {
PelicanAppConfig subject = subject();
subject.setMessageQueueHost("localhost:1234");
assertThat(subject.messageQueueHost(), is("localhost:1234"));
}

@Test
public void canOverridePropertiesForPublish() {
PubSubConfig subject = new PubSubConfig();
Properties actual = subject.propertiesForPublish("clientId");
assertThat(actual.get("my.new.prop.key"), is("my.new.prop.value"));
}

@Test
public void canOverridePropertiesForSubscribe() {
PubSubConfig subject = new PubSubConfig();
Properties actual = subject.propertiesForSubscribe("clientId", "consumerGroup");
assertThat(actual.get("my.new.prop.key"), is("my.new.prop.value"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -33,7 +34,7 @@ public void pollShouldGetMessage() {

String subscribeClientId = "pelican-subscribe-integration-test-1";
String consumerGroup = "pelican-consumer-group-integration-test-1";
Subscribe subject = appConfig.subscribe(Arrays.asList("test"), subscribeClientId, consumerGroup);
Subscribe subject = appConfig.subscribe(Collections.singletonList("test"), subscribeClientId, consumerGroup);

String publishClientId = "pelican-publisher-integration-test-1";
Publish publish = appConfig.publish(publishClientId);
Expand Down

0 comments on commit 5a51cf4

Please sign in to comment.