Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protos with nested extensions fail with Task not serializable #284

Closed
jckegelman opened this issue Oct 23, 2022 · 9 comments
Closed

Protos with nested extensions fail with Task not serializable #284

jckegelman opened this issue Oct 23, 2022 · 9 comments

Comments

@jckegelman
Copy link

jckegelman commented Oct 23, 2022

Hi, I am running into the following error:

org.apache.spark.SparkException: Task not serializable

when running this code:

val data = Seq(
  Baz(id = Some(1)),
  Baz(id = Some(2)),
  Baz(id = Some(3)),
)

val binaryDS = spark.createDataset(data.map(_.toByteArray))
binaryDS.show()

val protosDS = binaryDS.map(Baz.parseFrom(_))
protosDS.show()

with these proto definitions:

message Foo {
  optional string name = 1;
  extensions 100 to 199;
}

message Baz {
  extend Foo {
    optional int32 bar = 126;
  }
  optional int32 id = 1;
}

Full stack trace:

[info]   org.apache.spark.SparkException: Task not serializable
[info]   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
[info]   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
[info]   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
[info]   at org.apache.spark.SparkContext.clean(SparkContext.scala:2377)
[info]   at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[info]   at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
[info]   at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885)
[info]   at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
[info]   ...
[info]   Cause: java.io.NotSerializableException: scalapb.lenses.Lens$$anon$1
[info] Serialization stack:
[info]  - object not serializable (class: scalapb.lenses.Lens$$anon$1, value: scalapb.lenses.Lens$$anon$1@2efd8702)
[info]  - field (class: scalapb.GeneratedExtension, name: lens, type: interface scalapb.lenses.Lens)
[info]  - object (class scalapb.GeneratedExtension, GeneratedExtension(scalapb.lenses.Lens$$anon$1@2efd8702))
[info]  - field (class: com.example.protos.extensions.Baz$, name: bar, type: class scalapb.GeneratedExtension)
[info]  - object (class com.example.protos.extensions.Baz$, com.example.protos.extensions.Baz$@628905ca)
[info]  - element of array (index: 2)
[info]  - array (class [Ljava.lang.Object;, size 4)
[info]  - element of array (index: 1)
[info]  - array (class [Ljava.lang.Object;, size 3)
[info]  - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
[info]  - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
[info]  - writeReplace data (class: java.lang.invoke.SerializedLambda)
[info]  - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$9541/568370002, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$9541/568370002@2ba57ea)
[info]   at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
[info]   at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
[info]   at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
[info]   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
[info]   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
[info]   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
[info]   at org.apache.spark.SparkContext.clean(SparkContext.scala:2377)
[info]   at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

It looks like adding a nested extension adds that field to the "outer" object (Baz in the example above) so somewhere in the schema encoding it is trying to serialize Baz.bar, which is a scalapb.GeneratedExtension, which extends scalapb.lenses.Lens:

https://github.com/scalapb/ScalaPB/blob/aadabd3ab4c49b9f0dee14a42b1153300eaccc97/scalapb-runtime/src/main/scala/scalapb/GeneratedExtension.scala#L6

which is not Serializable.

I tried various incantations to avoid serialization, e.g. by wrapping Baz in a static object, but couldn't get it to work. I've submitted a PR with a reproducible example: #283

Do you have any suggested workarounds or would it be possible to make Lens Serializable ?

@thesamet
Copy link
Contributor

thesamet commented Oct 24, 2022

Thanks for reporting and for providing a reproducible example. The fix indeed makes Lens Serializable and will be part of the next ScalaPB release.

@jckegelman
Copy link
Author

that's great, thanks for the quick fix!

@jckegelman
Copy link
Author

hi, do you have a rough timeline for the next ScalaPB release? In the meantime, do you publish snapshots?

@thesamet
Copy link
Contributor

Hi, v0.11.12 is being published right now. It all goes well, it should be available on maven within the hour.

@thesamet
Copy link
Contributor

And yes - there are snapshots here: https://oss.sonatype.org/content/repositories/snapshots/com/thesamet/scalapb/

@jckegelman
Copy link
Author

That's great, thanks! One more question, do you have an approximate timeline for the next release of the sparksql-scalapb package? In the meantime we can use the snapshot.

Thanks again!

@thesamet
Copy link
Contributor

thesamet commented Oct 30, 2022

Hi @jckegelman , there are no changes in sparksql-scalapb relevant to this issue. Doesn't 1.0.1 work for you?

@jckegelman
Copy link
Author

oh, right, I think that should work. Sorry, just went through dependency hell trying to upgrade other packages but I believe you are correct in that we can depend on the latest ScalaPB v0.11.12 and get that into the über JAR we use even though this package depends on v0.11.11. I'll give it a try but I think we should be good, thanks!

@jckegelman
Copy link
Author

confirmed, that works, thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants