Skip to content

Commit

Permalink
add props inside admin and refactoring (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
davideicardi authored Oct 15, 2020
1 parent 54fa8a1 commit 8104735
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 47 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ try {
case class SuperheroV1(name: String)
```

## Http schema server

For a simple server implementation see `./kaa-registry-server`.

Use `GET /schemas/ids/{schemaId}` method to retrieve schemas.

## See also

- Avro: https://avro.apache.org/
Expand Down Expand Up @@ -112,4 +118,10 @@ Run example application:
docker-compose up -d
sbt sample/run
docker-compose down
```

Run http server:

```
sbt kaaRegistryServer/run
```
15 changes: 10 additions & 5 deletions kaa-registry-server/src/main/scala/kaa/EntryPoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ object EntryPoint extends App {
val brokers = sys.env.getOrElse("KAFKA_BROKERS", "localhost:9092")
val interface = sys.env.getOrElse("INTERFACE", "localhost")
val port = sys.env.getOrElse("PORT", "8888").toInt
val appName = sys.env.getOrElse("CLIENT_ID", "kaa-registry-server")
val appName = sys.env.getOrElse("APP_NAME", "kaa-registry-server")
val props = KaaSchemaRegistry.createProps(brokers, appName)

val admin = new KaaSchemaRegistryAdmin(brokers)
if (!admin.topicExists()) admin.createTopic()
val admin = new KaaSchemaRegistryAdmin(props)
try {
if (!admin.topicExists()) admin.createTopic()
} finally {
admin.close()
}

implicit val system: ActorSystem = ActorSystem(appName)
implicit val ec: ExecutionContextExecutor = ExecutionContext.global

val doneSignal = new CountDownLatch(1)

val schemaRegistry = KaaSchemaRegistry.create(brokers, appName)
val schemaRegistry = new KaaSchemaRegistry(brokers)
try {
val controller = new KaaController(schemaRegistry)

Expand All @@ -42,6 +47,6 @@ object EntryPoint extends App {

doneSignal.await()
} finally {
schemaRegistry.shutdown()
schemaRegistry.close()
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package com.davideicardi.kaa.darwin
import java.util.UUID

import com.davideicardi.kaa.{KaaSchemaRegistry, KaaSchemaRegistryAdmin, SchemaId}
import com.sksamuel.avro4s.AvroSchema
import org.scalatest._
import flatspec._
import matchers._
import com.davideicardi.kaa.{KaaSchemaRegistry, KaaSchemaRegistryAdmin, SchemaId}
import java.util.UUID
import org.scalatest.flatspec._
import org.scalatest.matchers._

class KaaSchemaRegistrySpec extends AnyFlatSpec with should.Matchers with BeforeAndAfterAll {

val BROKERS = "localhost:9092"
val TOPIC_NAME = "schema-registry-test" + UUID.randomUUID().toString()
val admin = new KaaSchemaRegistryAdmin(BROKERS, TOPIC_NAME)
private val BROKERS = "localhost:9092"
private val TOPIC_NAME = "schema-registry-test" + UUID.randomUUID().toString
private val props = KaaSchemaRegistry.createProps(BROKERS, "KaaSchemaRegistrySpec")
private val admin = new KaaSchemaRegistryAdmin(
props,
TOPIC_NAME,
)

private def createTarget() = {
new KaaSchemaRegistry(props, props, topic = TOPIC_NAME)
}

override protected def beforeAll(): Unit = {
if (!admin.topicExists())
Expand All @@ -23,7 +30,7 @@ class KaaSchemaRegistrySpec extends AnyFlatSpec with should.Matchers with Before
}

"KaaSchemaRegistry" should "put and retrieve a schema" in {
val target = KaaSchemaRegistry.create(BROKERS, TOPIC_NAME)
val target = createTarget()

try {
val schema = AvroSchema[Foo]
Expand All @@ -34,20 +41,20 @@ class KaaSchemaRegistrySpec extends AnyFlatSpec with should.Matchers with Before
case Some(schemaRetrieved) => schemaRetrieved should be (schema)
}
} finally {
target.shutdown()
target.close()
}
}

it should "return None for not existing schema" in {
val target = KaaSchemaRegistry.create(BROKERS, TOPIC_NAME)
val target = createTarget()

try {
target.get(SchemaId(999L)) match {
case None => succeed
case Some(_) => fail("Schema should not be retrieved")
}
} finally {
target.shutdown()
target.close()
}
}

Expand Down
46 changes: 26 additions & 20 deletions kaa/src/main/scala/com/davideicardi/kaa/KaaSchemaRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,22 @@ import com.davideicardi.kaa.KaaSchemaRegistry._

import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import com.davideicardi.kaa.utils.RetryConfig
import org.apache.kafka.clients.CommonClientConfigs

object KaaSchemaRegistry {
val DEFAULT_TOPIC_NAME = "schemas-v1"
val DEFAULT_CLIENT_ID = "KaaSchemaRegistry"
val DEFAULT_POLL_INTERVAL: FiniteDuration = 5.second
val DEFAULT_RETRY_CONFIG: RetryConfig = RetryConfig(5, 2.second)

def create(
def createProps(
brokers: String,
clientId: String,
topic: String = DEFAULT_TOPIC_NAME,
pollInterval: Duration = DEFAULT_POLL_INTERVAL,
getRetry: RetryConfig = DEFAULT_RETRY_CONFIG
): KaaSchemaRegistry = {
clientId: String = DEFAULT_CLIENT_ID,
): Properties = {
val consumerProps = new Properties()
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId)

val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId)

new KaaSchemaRegistry(
producerProps = producerProps,
consumerProps = consumerProps,
topic = topic,
pollInterval,
getRetry)
consumerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers)
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId)
consumerProps
}
}

Expand All @@ -61,6 +49,24 @@ class KaaSchemaRegistry(
pollInterval: Duration = DEFAULT_POLL_INTERVAL,
getRetry: RetryConfig = DEFAULT_RETRY_CONFIG
) extends SchemaRegistry {

def this(producerProps: Properties,
consumerProps: Properties) = {
this(
producerProps = producerProps,
consumerProps = consumerProps,
topic = DEFAULT_TOPIC_NAME,
pollInterval = DEFAULT_POLL_INTERVAL,
getRetry = DEFAULT_RETRY_CONFIG,
)
}
def this(brokers: String) = {
this(
producerProps = createProps(brokers),
consumerProps = createProps(brokers),
)
}

implicit private val ec: ExecutionContextExecutor = ExecutionContext.global

private val producer = createProducer()
Expand All @@ -83,7 +89,7 @@ class KaaSchemaRegistry(
consumer.close()
}

def shutdown(): Unit = {
def close(): Unit = {
stopping.set(true)
Await.result(startConsumerFuture, 10.seconds)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package com.davideicardi.kaa

import java.util.{Collections, Properties, Optional}
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
import org.apache.kafka.common.config.TopicConfig

import com.davideicardi.kaa.utils.CollectionConverters

class KaaSchemaRegistryAdmin(
brokers: String,
adminProps: Properties,
topic: String = KaaSchemaRegistry.DEFAULT_TOPIC_NAME
) {
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put("delete.enable.topic", "true")
private val adminClient = AdminClient.create(props)
def this(brokers: String) = {
this(
KaaSchemaRegistry.createProps(brokers, KaaSchemaRegistry.DEFAULT_CLIENT_ID),
topic = KaaSchemaRegistry.DEFAULT_TOPIC_NAME,
)
}

adminProps.putIfAbsent("delete.enable.topic", "true")
private val adminClient = AdminClient.create(adminProps)

def createTopic(): Unit = {
val NUMBER_OF_PARTITIONS = 1
Expand All @@ -38,4 +43,8 @@ class KaaSchemaRegistryAdmin(
def deleteTopic(): Unit = {
val _ = adminClient.deleteTopics(Collections.singletonList(topic)).all().get()
}

def close(): Unit = {
adminClient.close()
}
}
12 changes: 8 additions & 4 deletions sample/src/main/scala/sampleApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ object SampleApp {
def main(args: Array[String]): Unit = {
println("KaaSchemaRegistry SampleApp")

val brokers = "localhost:9092"
val brokers = sys.env.getOrElse("KAFKA_BROKERS", "localhost:9092")

val admin = new KaaSchemaRegistryAdmin(brokers)
if (!admin.topicExists()) admin.createTopic()
try {
if (!admin.topicExists()) admin.createTopic()
} finally {
admin.close()
}

val schemaRegistry = KaaSchemaRegistry.create(brokers, "sample")
val schemaRegistry = new KaaSchemaRegistry(brokers)
try {
val serializerV1 = new AvroSingleObjectSerializer[SuperheroV1](schemaRegistry)
val serializerV2 = new AvroSingleObjectSerializer[SuperheroV2](schemaRegistry)
Expand All @@ -39,7 +43,7 @@ object SampleApp {
println(s"V1 -> V2 $resultV2V1")

} finally {
schemaRegistry.shutdown()
schemaRegistry.close()
}
}
}
Expand Down

0 comments on commit 8104735

Please sign in to comment.