Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add TraversalBuilder.getValuePresentedSource method for further optimization #1701

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

He-Pin
Copy link
Member

@He-Pin He-Pin commented Jan 10, 2025

Motivation:
Add helper method to extract the value presented graph which will enable future optimization.
which is needed in #1672

Modification:
Add TraversalBuilder.getValuePresentedSource method

Result:
TraversalBuilder.getValuePresentedSource internal method added

@He-Pin He-Pin added the t:stream Pekko Streams label Jan 10, 2025
@He-Pin He-Pin added this to the 1.2.0 milestone Jan 10, 2025
@@ -23,7 +23,7 @@ import java.util.function.Consumer

/** INTERNAL API */
@InternalApi private[stream] final class JavaStreamSource[T, S <: java.util.stream.BaseStream[T, S]](
open: () => java.util.stream.BaseStream[T, S])
val open: () => java.util.stream.BaseStream[T, S])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed for optimization.

@@ -22,7 +22,7 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
/**
* INTERNAL API
*/
@InternalApi private[pekko] final class FailedSource[T](failure: Throwable) extends GraphStage[SourceShape[T]] {
@InternalApi private[pekko] final class FailedSource[T](val failure: Throwable) extends GraphStage[SourceShape[T]] {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed for optimization.

@He-Pin He-Pin force-pushed the getValuePresentedSource branch from 684155c to 0735893 Compare January 10, 2025 12:29
/**
* Test if a Graph is an empty Source.
*/
def isEmptySource(graph: Graph[SourceShape[_], _]): Boolean = graph match {
case source: scaladsl.Source[_, _] if source eq scaladsl.Source.empty => true
case source: javadsl.Source[_, _] if source eq javadsl.Source.empty() => true
case EmptySource => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't look related to this change - is this a bug that needs to be fixed in its own PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a bug, just a missing optimization case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that the cases were evaluated in order so this would not change performance in that case and it would actually lead to a different result if EmptySource was matched but that neither of the first 2 cases matched.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these methods are internal methods acutually.

}

"find Source.empty via TraversalBuilder with getValuePresentedSource" in {
val emptySource = EmptySource
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjfanning the tests coverage is here for the EmptySource

@He-Pin He-Pin force-pushed the getValuePresentedSource branch from 0735893 to 5ee21b8 Compare January 10, 2025 12:38
@He-Pin He-Pin requested a review from pjfanning January 10, 2025 12:38
@He-Pin He-Pin added the performance Related to performance label Jan 10, 2025
@He-Pin
Copy link
Member Author

He-Pin commented Jan 10, 2025

After this pr get merged, we can do some optimization for flatmapMerge too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance Related to performance t:stream Pekko Streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants