Skip to content

Commit

Permalink
handle null in genericserde for kafka tombstones handling (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
davideicardi authored Nov 4, 2020
1 parent 2a06c79 commit 4e08819
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 7 deletions.
17 changes: 10 additions & 7 deletions kaa/src/main/scala/com/davideicardi/kaa/kafka/GenericSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@ class GenericSerde[T >: Null : SchemaFor : Encoder : Decoder]
def deserializer: Deserializer[T] = this

override def deserialize(topic: String, data: Array[Byte]): T = {
avroSerializer.deserialize(data)
if (data == null || data.length == 0) {
null
} else {
avroSerializer.deserialize(data)
}
}

override def serialize(topic: String, data: T): Array[Byte] = {
avroSerializer.serialize(data)
if (data == null) {
Array()
} else {
avroSerializer.serialize(data)
}
}

override def close(): Unit = ()

override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = ()
}





53 changes: 53 additions & 0 deletions kaa/src/test/scala/kaa/kafka/GenericSerdeSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.davideicardi.kaa.darwin

import org.scalatest._
import flatspec._
import matchers._
import com.davideicardi.kaa.kafka.GenericSerde
import com.davideicardi.kaa.test.TestSchemaRegistry

class GenericSerdeSpec extends AnyFlatSpec with should.Matchers {

val registry = new TestSchemaRegistry

"GenericSerde" should "serialize and deserialize a case class" in {
val target = new GenericSerde[FooUser](registry)

val expected = FooUser("foo")
val bytes = target.serialize("topic", expected)
val result = target.deserialize("topic", bytes)

result should equal (expected)
}

it should "serialize and deserialize an Option Some" in {
val target = new GenericSerde[Option[FooUser]](registry)

val expected = Some(FooUser("foo"))
val bytes = target.serialize("topic", expected)
val result = target.deserialize("topic", bytes)
result should equal (expected)
}

it should "serialize and deserialize an Option None" in {
val target = new GenericSerde[Option[FooUser]](registry)

val expected: Option[FooUser] = None
val bytes = target.serialize("topic", expected)
val result = target.deserialize("topic", bytes)
result should equal (expected)
}

it should "serialize and deserialize a null" in {
val target = new GenericSerde[FooUser](registry)

val expected: FooUser = null
val bytes = target.serialize("topic", expected)
val result = target.deserialize("topic", bytes)

result should equal (expected)
}

case class FooUser (name: String) {
}
}

0 comments on commit 4e08819

Please sign in to comment.