Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experimental type safe eZtreme #2

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ lazy val core = project
.settings(
name := "zio-kafka-streams",
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % V.zio
"dev.zio" %% "zio" % V.zio,
"dev.zio" %% "zio-prelude" % "latest.integration"
)
)

Expand Down Expand Up @@ -84,7 +85,7 @@ lazy val datagen = project

lazy val examples = project
.in(file("examples"))
.dependsOn(core, datagen)
.dependsOn(core, datagen, testkit)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using testkit instead of launching actual Kafka, useful for demonstration purposes

.settings(commonSettings)
.settings(
name := "examples",
Expand Down
46 changes: 45 additions & 1 deletion examples/src/main/scala/com/github/niqdev/ToUpperCaseApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object ToUpperCaseTopology {
topology <- ZStreamsBuilder { builder =>
for {
sourceStream <- builder.stream[String, String](sourceTopic)
sinkStream <- sourceStream.mapValue(_.toUpperCase)
sinkStream <- sourceStream.mapValue(_.length)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an interesting case...

(sinkStream: ZKStream[String, Int])   <- sourceStream.mapValue(_.length)
_                                     <- sinkStream.to(sinkTopic)

the output type of the topic is purely determined by the types of the ZKStream[K, V] wrapper, you can't define for instance

def to[K0, VO](topic: String): RIO[KafkaStreamsConfig, Unit]

because that's how the scala/java KStream API works

So this will always compile until there is an instance for Codec[T] in scope, which exists for all basic types

On the other hand, I believe in most of the cases you would use Avro, and in this case this shouldn't be a problem at all, because you need to defined an instance of AvroCodec for your type T and that won't compile unless the types are what you expect

// here dummy.value.length doesn't compile
(sinkStream: ZKStream[DummyKey, DummyValue])   <- sourceStream.mapValue(dummy => DummyValue(dummy.value.toUpperCase))
_                                              <- sinkStream.toAvro(sinkTopic)

wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a greenfield project I agree avro is the state of the art and should be used. But there are many real case scenarios in which this is not possible or preferable. Smaller projects, brownfield projects that already have topics without avro (true story), systems that use another schema software or different binary encodings...

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough, but at this point, the issue is that there are default codecs already defined and we are using basic types... it's probably still not ideal and won't solve the issue completely, but if you have your own type, you can build your own codec using Codec[T].cmap, similar to

implicit val uuidCodec: Codec[java.util.UUID] =
  stringCodec.cmap(...)

and catch most of these errors at compile time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I totally understood what you are saying here. Are you saying that we could make it type safe by using slightly different codecs (for example UUID instead of String)?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's probably still not ideal and won't solve the issue completely

the current implementation by default uses implicit codecs and has the types "dictated" by the ZKStream constructor and you won't get around it easily

_ <- sinkStream.to(sinkTopic)
} yield ()
}
Expand Down Expand Up @@ -98,3 +98,47 @@ object ToUpperCaseConfig {
lazy val layer: ULayer[KafkaStreamsConfig with CustomConfig] =
configLayer ++ customConfigLayer
}

final case class ToCountCharsConfig(
applicationId: String,
bootstrapServers: String,
sourceTopic: String,
sinkTopic: String
)
object ToCountCharsConfig {
type CustomConfig = Has[CustomConfig.Service]

object CustomConfig {
trait Service {
def sourceTopic: Task[String]
def sinkTopic: Task[String]
}

def sourceTopic: RIO[CustomConfig, String] =
ZIO.accessM[CustomConfig](_.get.sourceTopic)
def sinkTopic: RIO[CustomConfig, String] =
ZIO.accessM[CustomConfig](_.get.sinkTopic)
}

private[this] lazy val configLayer: ULayer[KafkaStreamsConfig] =
KafkaStreamsConfig.make(
UIO.succeed(
AppConfig(
applicationId = "to-count-chars",
bootstrapServers = "localhost:9092",
debug = true
)
)
)

lazy val customConfigLayer: ULayer[CustomConfig] =
ZLayer.succeed(new CustomConfig.Service {
override def sourceTopic: Task[String] =
UIO.succeed("example.source.v1")
override def sinkTopic: Task[String] =
UIO.succeed("example.sink.v1")
})

lazy val layer: ULayer[KafkaStreamsConfig with CustomConfig] =
configLayer ++ customConfigLayer
}
56 changes: 56 additions & 0 deletions examples/src/main/scala/io/laserdisc/kafka/DangerousGDPR1.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.laserdisc.kafka
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they should probably be moved under

examples/src/[main>test]/scala/...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?? 🤔

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you run sbt test these specs are ignored, but it's just an example, so no big deal


import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{ StreamsConfig, Topology }
import zio.UIO
import zio.kafka.streams.KafkaStreamsTopology
import zio.kafka.streams.testkit.ZTestTopology
import zio.test.Assertion.equalTo
import zio.test._
import zio.test.environment.TestEnvironment

import java.util.Properties

object DangerousGDPR1 {
val config: Properties = {
val jProperties = new Properties()
jProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
jProperties
}

val builder: StreamsBuilder = new StreamsBuilder()
val newUsers: KStream[String, String] = builder.stream[String, String]("new-users-input")
val branches: Array[KStream[String, String]] =
newUsers.branch((k, _) => k.startsWith("u"), (k, _) => k.startsWith("p"))
// array is 0 based
val USERNAME = 1
val PASSWORD = 2
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stupidest error is to just put the wrong index. In this case we are writing passwords instead of user names

Copy link
Owner

@niqdev niqdev Dec 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This simple use case, currently should be covered by span, which is defined similar to List.span

def span(p: (K, V) => Boolean): Task[(ZKStream[K, V], ZKStream[K, V])] =
  Task.effect(stream.branch(p, !p(_, _))).map { branches =>
    (ZKStream.newInstance(branches(0)), ZKStream.newInstance(branches(1)))
  }

I believe we should avoid using directly branch exactly for this reason!

Now, if the conditions are more than one, probably a bit more thoughts on the parameters/return types would be necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good, but I would frame it as a special case of the more general branching process because, as you said, it needs to work for many arities (let's say up to 10 branches)

branches(USERNAME).to("new-users-output")

val topology: Topology = builder.build()
}

object DangerousGDPR1Spec extends DefaultRunnableSpec {
private[this] val testLayer =
ZTestTopology.testConfigLayer(true) >+> KafkaStreamsTopology.make(UIO(DangerousGDPR1.topology))

override def spec: ZSpec[TestEnvironment, Any] =
suite("newUsersSpec")(
testM("topology") {
for {
outputValue <- ZTestTopology.driver.use { driver =>
for {
input <- driver.createInput[String, String]("new-users-input")
output <- driver.createOutput[String, String]("new-users-output")
_ <- input.produce("p001", "Giovanni")
_ <- input.produce("u001", "password1")
value <- output.consumeValue
} yield value
}
} yield assert(outputValue)(equalTo("Giovanni"))
}.provideSomeLayerShared(testLayer.mapError(TestFailure.fail))
)
}
61 changes: 61 additions & 0 deletions examples/src/main/scala/io/laserdisc/kafka/DangerousGDPR2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.laserdisc.kafka

import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{ StreamsConfig, Topology }
import zio.UIO
import zio.kafka.streams.KafkaStreamsTopology
import zio.kafka.streams.testkit.ZTestTopology
import zio.test.Assertion.equalTo
import zio.test._
import zio.test.environment.TestEnvironment

import java.util.Properties

object DangerousGDPR2 {
val config: Properties = {
val jProperties = new Properties()
jProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
jProperties
}

val builder: StreamsBuilder = new StreamsBuilder()
val newUsers: KStream[String, String] = builder.stream[String, String]("new-users-input")
val branches: Array[KStream[String, String]] =
newUsers.branch((k, _) => k.startsWith("u"), (k, _) => k.startsWith("c"))
val USERNAME = 0
def securelyMappingUserForOutput(user: String): String = identity(user)
val CREDIT_CARD = 1
def securelyMappingCardForOutput(ccard: String): String = "************" + ccard.substring(12)
val processUser: KStream[String, String] =
branches(USERNAME).mapValues(username => securelyMappingUserForOutput(username))
val maskedCC: KStream[String, String] =
branches(CREDIT_CARD).mapValues(creditCard => securelyMappingUserForOutput(creditCard))
maskedCC.to("new-users-output")
Comment on lines +33 to +36
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error is a bit more hard to see because the functions have similar name. In this case we are writing credit card numbers without masking them.


val topology: Topology = builder.build()
}

object DangerousGDPR2Spec extends DefaultRunnableSpec {
private[this] val testLayer =
ZTestTopology.testConfigLayer(true) >+> KafkaStreamsTopology.make(UIO(DangerousGDPR2.topology))

override def spec: ZSpec[TestEnvironment, Any] =
suite("newUsersSpec")(
testM("topology") {
for {
outputValue <- ZTestTopology.driver.use { driver =>
for {
input <- driver.createInput[String, String]("new-users-input")
output <- driver.createOutput[String, String]("new-users-output")
_ <- input.produce("u001", "Giovanni")
_ <- input.produce("c001", "5483987343872038")
value <- output.consumeValue
} yield value
}
} yield assert(outputValue)(equalTo("************2038"))
}.provideSomeLayerShared(testLayer.mapError(TestFailure.fail))
)
}
58 changes: 58 additions & 0 deletions examples/src/main/scala/io/laserdisc/kafka/SafeGDPR1.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.laserdisc.kafka

import org.apache.kafka.streams.scala.{ Serdes, StreamsBuilder }
import org.apache.kafka.streams.{ StreamsConfig, Topology }
import zio.UIO
import zio.kafka.streams.testkit.ZTestTopology
import zio.kafka.streams.{ KafkaStreamsTopology, SafeTopic, TopologySdk }
import zio.prelude.State
import zio.test.Assertion.equalTo
import zio.test.environment.TestEnvironment
import zio.test.{ DefaultRunnableSpec, TestFailure, ZSpec, suite, testM, _ }

import java.util.Properties

object SafeGDPR1 extends TopologySdk {
val config: Properties = {
val jProperties = new Properties()
jProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
jProperties
}

val passwordFilter: (String, String) => Boolean = (k, _) => k.startsWith("p")
val usernameFilter: (String, String) => Boolean = (k, _) => k.startsWith("u")

val newUsersIn: SafeTopic[String, String] = SafeTopic("new-users-input", Serdes.String, Serdes.String)
val newUsersOut: SafeTopic[String, String] = SafeTopic("new-users-output", Serdes.String, Serdes.String)

val topologyReader: State[StreamsBuilder, Topology] = for {
input <- streamFromSource(newUsersIn)
branches <- branch(input)(usernameFilter, passwordFilter)
_ <- streamSinkTo(branches(passwordFilter), newUsersOut)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit better because I can clearly see the name passwordFilter for my branch. It's quite hard to make a mistake here, but if we do the compile will be happy and we still write password in clear text.

topology <- toTopology
} yield topology

val topology: Topology = topologyReader.runResult(new StreamsBuilder())
}

object SafeGDPR1Spec extends DefaultRunnableSpec {
private[this] val testLayer =
ZTestTopology.testConfigLayer(true) >+> KafkaStreamsTopology.make(UIO(SafeGDPR1.topology))

override def spec: ZSpec[TestEnvironment, Any] =
suite("newUsersSpec")(
testM("topology") {
for {
outputValue <- ZTestTopology.driver.use { driver =>
for {
input <- driver.createInput[String, String]("new-users-input")
output <- driver.createOutput[String, String]("new-users-output")
_ <- input.produce("u001", "Giovanni")
_ <- input.produce("p001", "password1")
value <- output.consumeValue
} yield value
}
} yield assert(outputValue)(equalTo("Giovanni"))
}.provideSomeLayerShared(testLayer.mapError(TestFailure.fail))
)
}
74 changes: 74 additions & 0 deletions examples/src/main/scala/io/laserdisc/kafka/SaferGDPR1.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.laserdisc.kafka

import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.scala.{ Serdes, StreamsBuilder }
import org.apache.kafka.streams.{ StreamsConfig, Topology }
import zio.UIO
import zio.kafka.streams.testkit.ZTestTopology
import zio.kafka.streams.{ KafkaStreamsTopology, SafeTopic, TopologySdk }
import zio.prelude.State
import zio.test.Assertion.equalTo
import zio.test.environment.TestEnvironment
import zio.test.{ DefaultRunnableSpec, TestFailure, ZSpec, suite, testM, _ }

import java.util.Properties
import zio.prelude.Subtype
import zio.kafka.streams.Extractor

object SaferGDPR1 extends TopologySdk {
object Username extends Subtype[String]
type Username = Username.Type
object Password extends Subtype[String]
type Password = Password.Type

val config: Properties = {
val jProperties = new Properties()
jProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
jProperties
}

val passwordFilter: (String, String) => Boolean = (k, _) => k.startsWith("p")
val passwordExtractor: Extractor[String, String, Password] =
Extractor(passwordFilter)(_.asInstanceOf[Password])
val passwordSerde: Serde[Password] = Serdes.String.asInstanceOf[Serde[Password]]
val usernameFilter: (String, String) => Boolean = (k, _) => k.startsWith("u")
val usernameExtractor: Extractor[String, String, Username] =
Extractor(usernameFilter)(_.asInstanceOf[Username])
val usernameSerde: Serde[Username] = Serdes.String.asInstanceOf[Serde[Username]]

val newUsersIn: SafeTopic[String, String] = SafeTopic("new-users-input", Serdes.String, Serdes.String)
val newUsersOut: SafeTopic[String, Username] = SafeTopic("new-users-output", Serdes.String, usernameSerde)
Comment on lines +30 to +40
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this crap can be automatically derived with a few lines of code.


val topologyReader: State[StreamsBuilder, Topology] = for {
input <- streamFromSource(newUsersIn)
branches <- safeBranch(input)(usernameExtractor, passwordExtractor)
(usernames, passwords) = branches // cannot inline this due to missing `withFilter` in ZPure
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is extremely hard to get wrong, even if you switch username with password while assigning the branches it will not compile because the output topic expects Username type, regardless of the variable name.

// ^-- try and switch these
_ <- streamSinkTo(usernames, newUsersOut)
topology <- toTopology
} yield topology

val topology: Topology = topologyReader.runResult(new StreamsBuilder())
}

object SaferGDPR1Spec extends DefaultRunnableSpec {
private[this] val testLayer =
ZTestTopology.testConfigLayer(true) >+> KafkaStreamsTopology.make(UIO(SaferGDPR1.topology))

override def spec: ZSpec[TestEnvironment, Any] =
suite("newUsersSpec")(
testM("topology") {
for {
outputValue <- ZTestTopology.driver.use { driver =>
for {
input <- driver.createInput[String, String]("new-users-input")
output <- driver.createOutput[String, String]("new-users-output")
_ <- input.produce("u001", "Giovanni")
_ <- input.produce("p001", "password1")
value <- output.consumeValue
} yield value
}
} yield assert(outputValue)(equalTo("Giovanni"))
}.provideSomeLayerShared(testLayer.mapError(TestFailure.fail))
)
}
Loading