-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
experimental type safe eZtreme #2
base: master
Are you sure you want to change the base?
Conversation
@@ -84,7 +85,7 @@ lazy val datagen = project | |||
|
|||
lazy val examples = project | |||
.in(file("examples")) | |||
.dependsOn(core, datagen) | |||
.dependsOn(core, datagen, testkit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using testkit instead of launching actual Kafka, useful for demonstration purposes
newUsers.branch((k, _) => k.startsWith("u"), (k, _) => k.startsWith("p")) | ||
// array is 0 based | ||
val USERNAME = 1 | ||
val PASSWORD = 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stupidest error is to just put the wrong index. In this case we are writing passwords instead of user names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This simple use case, currently should be covered by span
, which is defined similar to List.span
def span(p: (K, V) => Boolean): Task[(ZKStream[K, V], ZKStream[K, V])] =
Task.effect(stream.branch(p, !p(_, _))).map { branches =>
(ZKStream.newInstance(branches(0)), ZKStream.newInstance(branches(1)))
}
I believe we should avoid using directly branch
exactly for this reason!
Now, if the conditions are more than one, probably a bit more thoughts on the parameters/return types would be necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good, but I would frame it as a special case of the more general branching process because, as you said, it needs to work for many arities (let's say up to 10 branches)
branches(USERNAME).mapValues(username => securelyMappingUserForOutput(username)) | ||
val maskedCC: KStream[String, String] = | ||
branches(CREDIT_CARD).mapValues(creditCard => securelyMappingUserForOutput(creditCard)) | ||
maskedCC.to("new-users-output") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error is a bit more hard to see because the functions have similar name. In this case we are writing credit card numbers without masking them.
val topologyReader: State[StreamsBuilder, Topology] = for { | ||
input <- streamFromSource(newUsersIn) | ||
branches <- branch(input)(usernameFilter, passwordFilter) | ||
_ <- streamSinkTo(branches(passwordFilter), newUsersOut) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit better because I can clearly see the name passwordFilter
for my branch. It's quite hard to make a mistake here, but if we do the compile will be happy and we still write password in clear text.
val topologyReader: State[StreamsBuilder, Topology] = for { | ||
input <- streamFromSource(newUsersIn) | ||
branches <- safeBranch(input)(usernameExtractor, passwordExtractor) | ||
(usernames, passwords) = branches // cannot inline this due to missing `withFilter` in ZPure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is extremely hard to get wrong, even if you switch username with password while assigning the branches it will not compile because the output topic expects Username
type, regardless of the variable name.
branches <- safeBranch(input)(usernameExtractor, creditCardExtractor) | ||
(usernames, creditCards) = branches // cannot inline this due to missing `withFilter` in ZPure | ||
maskedCreditCards <- streamMapValues(creditCards)( | ||
securelyMappingCardForOutput // try putting the user one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting the similar named function here will result in a compile error, making it very hard to make a mistake and write credit cards somewhere unintended.
val passwordFilter: (String, String) => Boolean = (k, _) => k.startsWith("p") | ||
val passwordExtractor: Extractor[String, String, Password] = | ||
Extractor(passwordFilter)(_.asInstanceOf[Password]) | ||
val passwordSerde: Serde[Password] = Serdes.String.asInstanceOf[Serde[Password]] | ||
val usernameFilter: (String, String) => Boolean = (k, _) => k.startsWith("u") | ||
val usernameExtractor: Extractor[String, String, Username] = | ||
Extractor(usernameFilter)(_.asInstanceOf[Username]) | ||
val usernameSerde: Serde[Username] = Serdes.String.asInstanceOf[Serde[Username]] | ||
|
||
val newUsersIn: SafeTopic[String, String] = SafeTopic("new-users-input", Serdes.String, Serdes.String) | ||
val newUsersOut: SafeTopic[String, Username] = SafeTopic("new-users-output", Serdes.String, usernameSerde) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this crap can be automatically derived with a few lines of code.
final case class SafeTopic[K, V](topicName: String, keySerde: Serde[K], valueSerde: Serde[V]) | ||
final class Extractor[K, Vin, Vout <: Vin] private ( | ||
val predicate: (K, Vin) => Boolean, | ||
val downcaster: Vin => Vout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the downcaster
function could be an implicit that is derived. There is only a handful of possibilities for the implementation. If we use subtypes, like in my examples it's just a matter of implementing the instances for basic types and then we can recurively create more complex types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, if Vout
is Subtype[T]
just return T
.
Another example, if Vout
is String Refine T
just return T
.
@@ -22,7 +22,7 @@ object ToUpperCaseTopology { | |||
topology <- ZStreamsBuilder { builder => | |||
for { | |||
sourceStream <- builder.stream[String, String](sourceTopic) | |||
sinkStream <- sourceStream.mapValue(_.toUpperCase) | |||
sinkStream <- sourceStream.mapValue(_.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an interesting case...
(sinkStream: ZKStream[String, Int]) <- sourceStream.mapValue(_.length)
_ <- sinkStream.to(sinkTopic)
the output type of the topic is purely determined by the types of the ZKStream[K, V]
wrapper, you can't define for instance
def to[K0, VO](topic: String): RIO[KafkaStreamsConfig, Unit]
because that's how the scala/java KStream API works
So this will always compile until there is an instance for Codec[T]
in scope, which exists for all basic types
On the other hand, I believe in most of the cases you would use Avro, and in this case this shouldn't be a problem at all, because you need to defined an instance of AvroCodec
for your type T
and that won't compile unless the types are what you expect
// here dummy.value.length doesn't compile
(sinkStream: ZKStream[DummyKey, DummyValue]) <- sourceStream.mapValue(dummy => DummyValue(dummy.value.toUpperCase))
_ <- sinkStream.toAvro(sinkTopic)
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a greenfield project I agree avro is the state of the art and should be used. But there are many real case scenarios in which this is not possible or preferable. Smaller projects, brownfield projects that already have topics without avro (true story), systems that use another schema software or different binary encodings...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough, but at this point, the issue is that there are default codecs already defined and we are using basic types... it's probably still not ideal and won't solve the issue completely, but if you have your own type, you can build your own codec using Codec[T].cmap
, similar to
implicit val uuidCodec: Codec[java.util.UUID] =
stringCodec.cmap(...)
and catch most of these errors at compile time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I totally understood what you are saying here. Are you saying that we could make it type safe by using slightly different codecs (for example UUID
instead of String
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's probably still not ideal and won't solve the issue completely
the current implementation by default uses implicit codecs and has the types "dictated" by the ZKStream constructor and you won't get around it easily
@@ -0,0 +1,56 @@ | |||
package io.laserdisc.kafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they should probably be moved under
examples/src/[main>test]/scala/...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you run sbt test
these specs are ignored, but it's just an example, so no big deal
def toTopology: State[StreamsBuilder, Topology] = ZPure.get.map(_.build()) | ||
} | ||
|
||
final case class SafeTopic[K, V](topicName: String, keySerde: Serde[K], valueSerde: Serde[V]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's an Avro topic, how would you retrieve the schemaRegistryUrl
from the config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two possibilities in my view.
- You don't use
SafeTopic
but use a readertype ZTopic = Reader[SchemaRegistry, SafeTopic]
, this is the approach I used. But in this scenario I don't know how to use different schema registries for different topic because you would supply theSchemaRegistry
environment only once. Maybe there is a way if you split the topology. - We don't use serdes but richer structures that contain the schema registry URL (like your coded I think). I haven't tested this.
): State[StreamsBuilder, (KStream[K, Vout1], KStream[K, Vout2])] = | ||
ZPure.unit[StreamsBuilder].as { | ||
val branches = stream.branch(extractor1.predicate, extractor2.predicate) | ||
val stream1 = branches(0).mapValues(extractor1.downcaster) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you describe the topology you should expect only a branch node, in this case, you would see 2 more. What's the cost of a mapValues
to be able to "cast" a type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this. I think that the cost of mapping and create a branchMap
is not the issue here. We can always give users the ability to use plain branch
.
The real issue here is that it's not even necessary to map inside this branch. In theory we should be able to use the downcaster
function without kafka streams mapValues
but just as a regular function. Moreover it would be deleted by the compiler because of type erasure.
It will be a bit hacky maybe, but it's guaranteed to work and remove the cost of an additional 2 nodes.
import zio.prelude.State | ||
import zio.prelude.fx.ZPure | ||
|
||
trait TopologySdk { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the benefits of passing always (stream: KStream[K, V])
in the signatures and extending TopologySdk
vs the "wrapper approach"? Would there be any benefits in wrapping the state instead and keeping everything in the same class?
I'm just thinking aloud, I'm not familiar at all with the zio
and zio-preloude
codebases, so I might have completely misunderstood the internals... but I've seen an interesting pattern that we might be able to reuse here too!
Please correct me if I'm saying something stupid, but this and this look like a kind of state machine. Maybe we can do something similar to pass between the 3 main phases (builder -> source
, kstream/ktable/...
, builder => sink
). Would this help with the problem of the signature of the sink
? Thoughts?
Anyway, before changing the current impl, there are other few considerations to keep in mind.
Should we move away from the official Kakfa Scala lib and wrap directly the Java API? I don't really see a huge benefit in 3 layers Java/Scala/ZIO
- this might help and give us full control when Scala 3 comes around, otherwise we would stick to the scala version of the official lib
- most of the methods with
final Named named
are not directly accessible, it might sounds silly, but if you wanna use something like eisner you can't rename the nodes easily - get rid of these implicits
org.apache.kafka.streams.scala.ImplicitConversions/Serdes
for instance and probably write more "fp friendly" signatures
Also, I would like to come up with a nice way of dealing with effects in general, invoking an endpoint is something that you might don't wanna do for other reasons, but logging, metrics probably yes... If we find a nice way to integrate directly zio.stream.ZStream
for example, we might open the doors to interesting things...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get the first question. You don't pass the stream just because you want to do stream.doStuff
, you pass it because there might be many streams and you have to decide which one you want to use. Like, after a branch you have two or more streams, if you want to sink the second, how do you write it? Did I understand your question? Can you elaborate on "the problem of the signature of the sink
"? I don't understand what you are referring to.
All of the three points are good, I just think it will need a bit of work, and we can expose all methods to give name to nodes and get rid of the implicits as well.
About the effects, I think we are good. What would be the problem of integrating effects in mapValues
for example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be the problem of integrating effects in
mapValues
for example?
Ignore that it's written with cats, but honestly, I didn't find a nice way to do something similar, have you tried?
// O could be (KO, VO) or Unit depending on the use case
def runMyF[O](f: (K, V) => F[O])(
implicit F: Sync/Async[F]
): F[???] =
About the first question, in most of the cases you will have to pass around always stream: KStream[K, V]
, so what I was suggesting is to differentiate the cases where you are changing the types in different classes for example. As I said, it was just a thought and it might be over complicating. I'm was suggesting to combine the 2 approaches and see if there is any benefit
trait TopologySdk { | ||
def streamFromSource[K, V]( | ||
sourceSafeTopic: SafeTopic[K, V] | ||
): State[StreamsBuilder, KStream[K, V]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you use only State[StreamsBuilder, KStream[K, V]]
, how do you access the config or other envs inside the topology i.e. RIO[KafkaStreamsConfig, Topology]
? In more complex use case with the state store you need for example to set the name or other params
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know, what about passing a StoreBuilder
and a config (StoreBuilder => StoreBuilder
) when you build the store? As normal parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that was just something off the top of my head, I'm pretty sure there are cases where you might want to access some values from the ZEnv
@gurghet I have a proposal for you - I think the simpler way to figure out the best approach is just by implementing it! So I would suggest to keep this PR open for now with all the discussion and open a new one starting from this, with the following structure
I can help in refactoring the modules, but I need you first to find a nice way to hook up for example
Once this is done, I think we should move the repo, release a snapshot version and then we can work on unifying the implementations while working on it WDYT? |
btw ... |
This is a seminal version to set a target state for this library
cc: @niqdev
There are 5 example files. The first 2 are plain kafka to show the downsides of not being typesafe, they compile but the test fails. The third
SafeGDPR1
is a compromise it makes harder to mix up variables. The target state is represented by theSafer
version, the last 2 files, it makes it impossible to mix up variables because it doesn't compile otherwise.After you review these files we can start planning what is the preferred target state and how to migrate your API to make it compatible.