Skip to content

Commit

Permalink
allow to use kafka properties (#31)
Browse files Browse the repository at this point in the history
* allow to use kafka properties
  • Loading branch information
davideicardi authored Oct 15, 2020
1 parent 990bd3a commit 54fa8a1
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 44 deletions.
12 changes: 6 additions & 6 deletions kaa-registry-server/src/main/scala/kaa/EntryPoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
object EntryPoint extends App {
println("Kaa Schema Registry Server")

val brokers = "localhost:9092"
val host = "localhost"
val appName = "kaa-registry-server"
val port = 8888
val brokers = sys.env.getOrElse("KAFKA_BROKERS", "localhost:9092")
val interface = sys.env.getOrElse("INTERFACE", "localhost")
val port = sys.env.getOrElse("PORT", "8888").toInt
val appName = sys.env.getOrElse("CLIENT_ID", "kaa-registry-server")

val admin = new KaaSchemaRegistryAdmin(brokers)
if (!admin.topicExists()) admin.createTopic()
Expand All @@ -23,12 +23,12 @@ object EntryPoint extends App {

val doneSignal = new CountDownLatch(1)

val schemaRegistry = new KaaSchemaRegistry(brokers)
val schemaRegistry = KaaSchemaRegistry.create(brokers, appName)
try {
val controller = new KaaController(schemaRegistry)

val httpServer = new KaaHttpServer(
host,
interface,
port,
Seq(controller)
)
Expand Down
6 changes: 3 additions & 3 deletions kaa-registry-server/src/main/scala/kaa/KaaHttpService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ trait RouteController {
}

class KaaHttpServer(
host: String,
interface: String,
port: Int,
controllers: Seq[RouteController],
)
Expand All @@ -23,10 +23,10 @@ class KaaHttpServer(
val route = concat(controllers.map(_.createRoute()):_*)

bindingFuture = Some {
Http().newServerAt(host, port)
Http().newServerAt(interface, port)
.bindFlow(route)
}
println(s"Server online at http://${host}:${port}/\n")
println(s"Server online at http://$interface:$port/\n")

Runtime.getRuntime.addShutdownHook(new Thread(() => {
stop()
Expand Down
4 changes: 2 additions & 2 deletions kaa/src/it/scala/kaa/darwin/KaaSchemaRegistrySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class KaaSchemaRegistrySpec extends AnyFlatSpec with should.Matchers with Before
}

"KaaSchemaRegistry" should "put and retrieve a schema" in {
val target = new KaaSchemaRegistry(BROKERS, TOPIC_NAME)
val target = KaaSchemaRegistry.create(BROKERS, TOPIC_NAME)

try {
val schema = AvroSchema[Foo]
Expand All @@ -39,7 +39,7 @@ class KaaSchemaRegistrySpec extends AnyFlatSpec with should.Matchers with Before
}

it should "return None for not existing schema" in {
val target = new KaaSchemaRegistry(BROKERS, TOPIC_NAME)
val target = KaaSchemaRegistry.create(BROKERS, TOPIC_NAME)

try {
target.get(SchemaId(999L)) match {
Expand Down
92 changes: 60 additions & 32 deletions kaa/src/main/scala/com/davideicardi/kaa/KaaSchemaRegistry.scala
Original file line number Diff line number Diff line change
@@ -1,37 +1,67 @@
package com.davideicardi.kaa

import java.lang

import org.apache.avro.Schema
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.avro.SchemaNormalization
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.{Collections, Properties, UUID}
import java.time.{Duration => JavaDuration}
import com.github.blemale.scaffeine.{ Cache, Scaffeine }

import com.github.blemale.scaffeine.{Cache, Scaffeine}
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{LongDeserializer, StringDeserializer}
import org.apache.kafka.common.serialization.{LongSerializer, StringSerializer}

import scala.concurrent.duration._
import com.davideicardi.kaa.utils.Retry
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import scala.concurrent.Await

import com.davideicardi.kaa.KaaSchemaRegistry._

import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import com.davideicardi.kaa.utils.RetryConfig

object KaaSchemaRegistry {
val DEFAULT_TOPIC_NAME = "schemas-v1"
val DEFAULT_CLIENT_ID = "KaaSchemaRegistry"
val DEFAULT_POLL_INTERVAL: FiniteDuration = 5.second
val DEFAULT_RETRY_CONFIG: RetryConfig = RetryConfig(5, 2.second)

def create(
brokers: String,
clientId: String,
topic: String = DEFAULT_TOPIC_NAME,
pollInterval: Duration = DEFAULT_POLL_INTERVAL,
getRetry: RetryConfig = DEFAULT_RETRY_CONFIG
): KaaSchemaRegistry = {
val consumerProps = new Properties()
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId)

val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId)

new KaaSchemaRegistry(
producerProps = producerProps,
consumerProps = consumerProps,
topic = topic,
pollInterval,
getRetry)
}
}

class KaaSchemaRegistry(
brokers: String,
topic: String = KaaSchemaRegistry.DEFAULT_TOPIC_NAME,
cliendId: String = "KaaSchemaRegistry",
pollInterval: Duration = 5.second,
getRetry: RetryConfig = RetryConfig(5, 2.second)
producerProps: Properties,
consumerProps: Properties,
topic: String = DEFAULT_TOPIC_NAME,
pollInterval: Duration = DEFAULT_POLL_INTERVAL,
getRetry: RetryConfig = DEFAULT_RETRY_CONFIG
) extends SchemaRegistry {

implicit private val ec = ExecutionContext.global
implicit private val ec: ExecutionContextExecutor = ExecutionContext.global

private val producer = createProducer()
private val consumer = createConsumer()
Expand All @@ -45,12 +75,12 @@ class KaaSchemaRegistry(
while (!stopping.get()) {
val records = consumer.poll(jPollInterval)

records.forEach((record) => {
records.forEach(record => {
cache.put(record.key(), record.value())
})
}

consumer.close();
consumer.close()
}

def shutdown(): Unit = {
Expand Down Expand Up @@ -78,30 +108,28 @@ class KaaSchemaRegistry(
}
}

protected def createConsumer() = {
new KafkaConsumer(consumerProps(), new LongDeserializer(), new StringDeserializer())
protected def createConsumer(): KafkaConsumer[lang.Long, String] = {
new KafkaConsumer(fillConsumerProps(), new LongDeserializer(), new StringDeserializer())
}

protected def consumerProps(): Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, cliendId)
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
protected def fillConsumerProps(): Properties = {

consumerProps.putIfAbsent(ConsumerConfig.CLIENT_ID_CONFIG, KaaSchemaRegistry.DEFAULT_CLIENT_ID)

consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString)
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

props
consumerProps
}

protected def createProducer() = {
new KafkaProducer(producerProps(), new LongSerializer(), new StringSerializer())
protected def createProducer(): KafkaProducer[lang.Long, String] = {
new KafkaProducer(fillProducerProps(), new LongSerializer(), new StringSerializer())
}

protected def producerProps(): Properties = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.CLIENT_ID_CONFIG, cliendId)

props
protected def fillProducerProps(): Properties = {
producerProps.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, KaaSchemaRegistry.DEFAULT_CLIENT_ID)

producerProps
}
}
}
2 changes: 1 addition & 1 deletion sample/src/main/scala/sampleApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object SampleApp {
val admin = new KaaSchemaRegistryAdmin(brokers)
if (!admin.topicExists()) admin.createTopic()

val schemaRegistry = new KaaSchemaRegistry(brokers)
val schemaRegistry = KaaSchemaRegistry.create(brokers, "sample")
try {
val serializerV1 = new AvroSingleObjectSerializer[SuperheroV1](schemaRegistry)
val serializerV2 = new AvroSingleObjectSerializer[SuperheroV2](schemaRegistry)
Expand Down

0 comments on commit 54fa8a1

Please sign in to comment.