Skip to content

Commit

Permalink
Add Spark to PipelineDP4j, sharded key filtering to PBeam
Browse files Browse the repository at this point in the history
Export of internal changes.
--
5478b1957ce80be498abac882a75dd958e1b966e by Differential Privacy Team <noreply@google.com>:

Introduce AboveThresholdSelector to StreamingPartitionSelector

PiperOrigin-RevId: 700987846
Change-Id: Ia6b8f8b2d5da936a3d6229c180d59dc176a89970

--
050bc0203c0cb5d07e67bd7e0dab20b348ddd564 by Differential Privacy Team <noreply@google.com>:

n/a

PiperOrigin-RevId: 700310388
Change-Id: Ib0867257ab65acc7bfc37b20aac49211ca3bfec4
GitOrigin-RevId: 11fd989e0ed941bca0f189400d54ff464c4f4238
  • Loading branch information
Differential Privacy Team authored and RamSaw committed Dec 9, 2024
1 parent 1dfe8f9 commit 5a84fbf
Show file tree
Hide file tree
Showing 31 changed files with 592 additions and 113 deletions.
9 changes: 0 additions & 9 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,3 @@
**/bazel-java
**/bazel-out
**/bazel-testlogs

**/bazel-differential-privacy
**/.ijwb/
**/pipelinedp4j/.ijwb/
**/pipelinedp4j/bazel-pipelinedp4j
**/pipelinedp4j/MODULE**
**/examples/.idea/
**/examples/pipelinedp4j/bazel-pipelinedp4j
**/examples/pipelinedp4j/MODULE**
11 changes: 6 additions & 5 deletions cc/algorithms/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ cc_library(
deps = [
":algorithm",
":approx-bounds",
":bounded-algorithm",
":numerical-mechanisms",
":util",
"//proto:util-lib",
Expand Down Expand Up @@ -245,7 +244,6 @@ cc_library(
deps = [
":algorithm",
":approx-bounds",
":bounded-algorithm",
":numerical-mechanisms",
":util",
"//proto:util-lib",
Expand Down Expand Up @@ -291,10 +289,8 @@ cc_library(
deps = [
":algorithm",
":approx-bounds",
":bounded-algorithm",
":bounded-variance",
":numerical-mechanisms",
":util",
"//proto:util-lib",
"@com_google_absl//absl/memory",
"@com_google_absl//absl/status",
Expand Down Expand Up @@ -534,6 +530,7 @@ cc_test(
cc_library(
name = "bounded-algorithm",
hdrs = ["bounded-algorithm.h"],
visibility = ["//visibility:private"],
deps = [
":algorithm",
":approx-bounds",
Expand Down Expand Up @@ -668,10 +665,14 @@ cc_library(
visibility = ["//visibility:public"],
deps = [
":algorithm",
":bounded-algorithm",
":numerical-mechanisms",
":quantile-tree",
":util",
"@com_google_absl//absl/log",
"@com_google_absl//absl/status",
"@com_google_absl//absl/status:statusor",
"@com_google_absl//absl/strings",
"@com_google_cc_differential_privacy//base:status_macros",
],
)

Expand Down
2 changes: 0 additions & 2 deletions cc/algorithms/bounded-standard-deviation.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
#include "absl/status/statusor.h"
#include "algorithms/algorithm.h"
#include "algorithms/approx-bounds.h"
#include "algorithms/bounded-algorithm.h"
#include "algorithms/bounded-variance.h"
#include "algorithms/numerical-mechanisms.h"
#include "algorithms/util.h"
#include "proto/util.h"
#include "proto/data.pb.h"
#include "proto/summary.pb.h"
Expand Down
2 changes: 0 additions & 2 deletions cc/algorithms/bounded-sum.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@

#include "google/protobuf/any.pb.h"
#include "absl/log/log.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "algorithms/algorithm.h"
#include "algorithms/approx-bounds.h"
#include "algorithms/bounded-algorithm.h"
#include "algorithms/numerical-mechanisms.h"
#include "algorithms/util.h"
#include "proto/util.h"
Expand Down
1 change: 0 additions & 1 deletion cc/algorithms/bounded-variance.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include "absl/strings/str_cat.h"
#include "algorithms/algorithm.h"
#include "algorithms/approx-bounds.h"
#include "algorithms/bounded-algorithm.h"
#include "algorithms/numerical-mechanisms.h"
#include "algorithms/util.h"
#include "proto/util.h"
Expand Down
12 changes: 10 additions & 2 deletions cc/algorithms/quantiles.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@
#define DIFFERENTIAL_PRIVACY_CPP_ALGORITHMS_QUANTILES_H_

#include <cstdint>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
#include <vector>

#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "algorithms/algorithm.h"
#include "algorithms/bounded-algorithm.h"
#include "algorithms/numerical-mechanisms.h"
#include "algorithms/quantile-tree.h"
#include "algorithms/util.h"
#include "base/status_macros.h"

namespace differential_privacy {

Expand Down Expand Up @@ -239,7 +247,7 @@ class Quantiles<T>::Builder {
int max_partitions_contributed_ = 1;
int max_contributions_per_partition_ = 1;
std::unique_ptr<NumericalMechanismBuilder> mechanism_builder_ =
absl::make_unique<LaplaceMechanism::Builder>();
std::make_unique<LaplaceMechanism::Builder>();
std::vector<double> quantiles_;

static absl::Status ValidateQuantiles(std::vector<double>& quantiles) {
Expand Down
4 changes: 4 additions & 0 deletions cc/algorithms/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ absl::Status ValidateMaxPartitionsContributed(
"contributed to (i.e., L0 sensitivity)");
}

absl::Status ValidateMaxWindows(std::optional<int> max_windows) {
return ValidateIsPositive(max_windows, "Maximum number of windows");
}

absl::Status ValidateMaxContributionsPerPartition(
std::optional<double> max_contributions_per_partition) {
return ValidateIsPositive(max_contributions_per_partition,
Expand Down
1 change: 1 addition & 0 deletions cc/algorithms/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ absl::Status ValidateEpsilon(std::optional<double> epsilon);
absl::Status ValidateDelta(std::optional<double> delta);
absl::Status ValidateMaxPartitionsContributed(
std::optional<double> max_partitions_contributed);
absl::Status ValidateMaxWindows(std::optional<int> max_windows);
absl::Status ValidateMaxContributionsPerPartition(
std::optional<double> max_contributions_per_partition);
absl::Status ValidateMaxContributions(std::optional<int> max_contributions);
Expand Down
2 changes: 1 addition & 1 deletion examples/pipelinedp4j/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ Here's are the steps to build and run the example assuming you are in the
1. Run the program:
```shell
bazel-bin/BeamExample --inputFilePath=netflix_data.csv --outputFilePath=output.txt
bazel-bin/src/main/java/com/google/privacy/differentialprivacy/pipelinedp4j/examples/BeamExample --inputFilePath=netflix_data.csv --outputFilePath=output.txt
```
1. View the results:
Expand Down
4 changes: 4 additions & 0 deletions examples/pipelinedp4j/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
<configuration>
<source>11</source>
<target>11</target>
<excludes>
<!-- SparkExample is not supoprted yet. -->
<exclude>com/google/privacy/differentialprivacy/pipelinedp4j/examples/SparkExample.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,22 @@ java_binary(
java_binary(
name = "SparkExample",
srcs = [
"SparkExample.java",
"MovieMetrics.java",
"MovieView.java",
"SparkExample.java",
],
main_class = "com.google.privacy.differentialprivacy.pipelinedp4j.examples.SparkExample",
deps = [
"@com_google_privacy_differentialprivacy_pipielinedp4j//main/com/google/privacy/differentialprivacy/pipelinedp4j/api",
"@maven//:com_fasterxml_jackson_core_jackson_databind",
"@maven//:com_fasterxml_jackson_module_jackson_module_paranamer",
"@maven//:com_google_guava_guava",
"@maven//:info_picocli_picocli",
"@maven//:org_jetbrains_kotlin_kotlin_stdlib",

"@maven//:org_apache_spark_spark_catalyst_2_13",
"@maven//:org_apache_spark_spark_core_2_13",
"@maven//:org_apache_spark_spark_sql_2_13",
"@maven//:org_apache_spark_spark_mllib_2_13",
"@maven//:org_apache_spark_spark_catalyst_2_13",
"@maven//:com_fasterxml_jackson_core_jackson_databind",
"@maven//:com_fasterxml_jackson_module_jackson_module_paranamer",
"@maven//:org_apache_spark_spark_sql_2_13",
"@maven//:org_jetbrains_kotlin_kotlin_stdlib",
"@maven//:org_scala_lang_scala_library",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private static PCollection<MovieView> readData(Pipeline pipeline, String inputFi

/**
* Movie ids (which are group keys for this dataset) are integers from 1 to ~17000. Set public
* groups 4500-4509.
* groups to a subset of them.
*/
private static PCollection<String> publiclyKnownMovieIds(Pipeline pipeline) {
var publicGroupsAsJavaList =
Expand Down
1 change: 0 additions & 1 deletion pipelinedp4j/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ pom_file(
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_collections",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_dp_engine_factory",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_encoders",

],
template_file = "pom.template",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,5 @@ kt_jvm_library(
"@maven//:com_google_guava_guava",
"@maven//:org_apache_beam_beam_sdks_java_core",
"@maven//:org_apache_beam_beam_sdks_java_extensions_avro",

],
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@

package com.google.privacy.differentialprivacy.pipelinedp4j.api

import com.google.privacy.differentialprivacy.pipelinedp4j.spark.SparkCollection
import com.google.privacy.differentialprivacy.pipelinedp4j.spark.SparkEncoderFactory
import org.apache.spark.sql.Dataset
import com.google.privacy.differentialprivacy.pipelinedp4j.beam.BeamCollection
import com.google.privacy.differentialprivacy.pipelinedp4j.beam.BeamEncoderFactory
import com.google.privacy.differentialprivacy.pipelinedp4j.core.EncoderFactory
import com.google.privacy.differentialprivacy.pipelinedp4j.core.FrameworkCollection
import com.google.privacy.differentialprivacy.pipelinedp4j.local.LocalCollection
import com.google.privacy.differentialprivacy.pipelinedp4j.local.LocalEncoderFactory
import com.google.privacy.differentialprivacy.pipelinedp4j.spark.SparkCollection
import com.google.privacy.differentialprivacy.pipelinedp4j.spark.SparkEncoderFactory
import org.apache.beam.sdk.values.PCollection as BeamPCollection
import org.apache.spark.sql.Dataset

/**
* An internal interface to represent an arbitrary collection that is supported by PipelineDP4j.
Expand Down Expand Up @@ -57,7 +57,8 @@ internal data class LocalPipelineDpCollection<T>(val data: Sequence<T>) : Pipeli
}

/** Spark Collection represented as a Spark Dataset. */
internal data class SparkPipelineDpCollection<T>(val data: Dataset<T>) : PipelineDpCollection<T> {
internal data class SparkPipelineDpCollection<T>(val data: Dataset<T>) : PipelineDpCollection<T>
{
override val encoderFactory = SparkEncoderFactory()

override fun toFrameworkCollection() = SparkCollection<T>(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.privacy.differentialprivacy.pipelinedp4j.api

import org.apache.beam.sdk.values.PCollection as BeamPCollection

import org.apache.spark.sql.Dataset

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import com.google.privacy.differentialprivacy.pipelinedp4j.core.EncoderFactory
import com.google.protobuf.Message
import java.io.InputStream
import java.io.OutputStream
import kotlin.reflect.KClass
import org.apache.beam.sdk.coders.Coder
import org.apache.beam.sdk.coders.CustomCoder
import org.apache.beam.sdk.coders.DoubleCoder
import org.apache.beam.sdk.coders.StringUtf8Coder
import org.apache.beam.sdk.coders.VarIntCoder
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder

import org.apache.beam.sdk.extensions.protobuf.ProtoCoder

class BeamEncoder<T>(val coder: Coder<T>) : Encoder<T>
Expand All @@ -39,11 +39,10 @@ class BeamEncoderFactory() : EncoderFactory {

override fun ints() = BeamEncoder<Int>(VarIntCoder.of())

override fun <T : Any> records(recordClass: KClass<T>) =
BeamEncoder<T>(AvroCoder.of(recordClass.java))
override fun <T : Any> records(recordClass: Class<T>) = BeamEncoder<T>(AvroCoder.of(recordClass))

override fun <T : Message> protos(protoClass: KClass<T>) =
BeamEncoder<T>(ProtoCoder.of(protoClass.java))
override fun <T : Message> protos(protoClass: Class<T>) =
BeamEncoder<T>(ProtoCoder.of(protoClass))

override fun <T1 : Any, T2 : Any> tuple2sOf(first: Encoder<T1>, second: Encoder<T2>) =
BeamEncoder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,56 @@ interface EncoderFactory {
/** Returns an [Encoder] for an integer value, which can be stored in a [FrameworkCollection]. */
fun ints(): Encoder<Int>

/** Encoder for data classes. */
fun <T : Any> records(recordClass: KClass<T>): Encoder<T>
/** Encoder for classes. */
fun <T : Any> records(recordClass: Class<T>): Encoder<T>

/** Same as [records(Class)] but accepts Kotlin class. */
fun <T : Any> records(recordClass: KClass<T>) = records(recordClass.java)

/** Returns an [Encoder] for a protobuf value, which can be stored in a [FrameworkCollection]. */
fun <T : Message> protos(protoClass: KClass<T>): Encoder<T>
fun <T : Message> protos(protoClass: Class<T>): Encoder<T>

/** Same as [protos(Class)] but accepts Kotlin class. */
fun <T : Message> protos(protoClass: KClass<T>) = protos(protoClass.java)

/** Returns an [Encoder] for a pair of tuples, which can be stored in a [FrameworkCollection]. */
fun <T1 : Any, T2 : Any> tuple2sOf(first: Encoder<T1>, second: Encoder<T2>): Encoder<Pair<T1, T2>>

/**
* Returns the most specific [Encoder] for any record type given its [KClass], including primitive
* types and proto but except pairs.
*
* Use it when the record type is not known at compile time and it can be a primitive type or a
* proto. This method will return the most appropriate (and efficient) [Encoder] for the given
* type.
*
* Note that this method does not work for pairs ([tuple2sOf]) and for any other classes that are
* parameterized by generic types.
*/
fun <T : Any> recordsOfUnknownClass(recordClass: Class<T>) =
when {
recordClass == String::class.java -> strings()
recordClass == Double::class.java -> doubles()
recordClass == Int::class.java -> ints()
Message::class.java.isAssignableFrom(recordClass) -> {
@Suppress("UNCHECKED_CAST") protos(recordClass as Class<out Message>)
}
else -> records(recordClass)
}

/** Same as [recordsOfUnknownClass(Class)] but accepts Kotlin class. */
fun <T : Any> recordsOfUnknownClass(recordClass: KClass<T>) =
recordsOfUnknownClass(recordClass.java)
}

/**
* Inlines the function and the type parameter which allows to use [EncoderFactory.records] without
* specifying the class.
*/
inline fun <reified T : Any> EncoderFactory.records() = this.records(T::class)

/**
* Inlines the function and the type parameter which allows to use [EncoderFactory.protos] without
* specifying the class.
*/
inline fun <reified T : Message> EncoderFactory.protos() = this.protos(T::class)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package com.google.privacy.differentialprivacy.pipelinedp4j.local
import com.google.privacy.differentialprivacy.pipelinedp4j.core.Encoder
import com.google.privacy.differentialprivacy.pipelinedp4j.core.EncoderFactory
import com.google.protobuf.Message
import kotlin.reflect.KClass

class LocalEncoderFactory() : EncoderFactory {
// The implementation of local encoders is empty because when the data is being processed
Expand All @@ -36,9 +35,9 @@ class LocalEncoderFactory() : EncoderFactory {
return object : Encoder<Int> {}
}

override fun <T : Any> records(recordClass: KClass<T>): Encoder<T> = object : Encoder<T> {}
override fun <T : Any> records(recordClass: Class<T>): Encoder<T> = object : Encoder<T> {}

override fun <T : Message> protos(protoClass: KClass<T>): Encoder<T> = object : Encoder<T> {}
override fun <T : Message> protos(protoClass: Class<T>): Encoder<T> = object : Encoder<T> {}

override fun <T1 : Any, T2 : Any> tuple2sOf(first: Encoder<T1>, second: Encoder<T2>) =
object : Encoder<Pair<T1, T2>> {}
Expand Down
Loading

0 comments on commit 5a84fbf

Please sign in to comment.