Fri 28 October 2022
The Java implementation of Google's differential-privacy provides building blocks for computing \((\epsilon, \delta)\) differentially private statistics over datasets.
CountVisitsPerDay gives an example of how to anonymously count daily restaurant visitors on each day of a week (using a Count object). The relevant logic:
EnumMap<DayOfWeek, Integer> privateCountsPerDay = new EnumMap<>(DayOfWeek.class);
...
Arrays.stream(DayOfWeek.values()).forEach(d -> {
Count dpCount =
Count.builder()
.epsilon(LN_3)
// The data was pre-processed so that each visitor may visit the restaurant up to
// MAX_VISIT_DAYS days per week.
// Hence, each user may contribute to up to MAX_VISIT_DAYS daily counts.
// Note: while the library accepts this limit as a configurable parameter,
// it doesn't pre-process the data to ensure this limit is respected.
// It is responsibility of the caller to ensure the data passed to the library
// is capped for getting the correct privacy guarantee.
.maxPartitionsContributed(MAX_CONTRIBUTED_DAYS)
.build();
dpCount.incrementBy(boundedVisits.getVisitsForDay(d).size());
privateCountsPerDay.put(d, (int) dpCount.computeResult());
});
For each day of the week, a dpCount
private counter is incremented by the amount of visitors of that day. The daily
statistic is computed and cached in a hashtable.
Thanks to algebraic properties of their implementation, Count
and other statistics are suitable for map/reduce type of computations. In the reminder of this article we'll look at scaling up
differential privacy on Apache Spark.
Spark 3 introduced a public Aggregator
base class for efficient User Defined Aggregate functions
(UDAFs). See User Defined Aggregation in Apache Spark: A Love Story
for performance considerations. Aggregator
borrows from algebird. It operates on an aggregating monoid type, and adds some accumulation and presentation logic. It supports both typed and untyped DataFrames.
An Aggregator[IN, BUF, OUT]
class requires input IN
, aggregating accumulator BUF
(to store intermediate results)
and output OUT
type parameters.
In Scala it must implement the following methods:
zero()
: the identity element of the aggregating monoid.merge(BUF b1, BUF b2)
: combines two accumulators into a new accumulator. reduce(BUF b, IN a)
: updates the accumulator with an IN
value. finish(BUF reduction)
: presents the output OUT
of a reduced result. Count
s are Aggregator[IN, BUF, OUT]
s:
IN
type is any Numeric
.OUT
type can be a Long
.BUF
type is a Count
itself.Count
is a newly instantiated, empty, Count
object.Counts
can be merged via their mergeWith()
method.Count
can be updated via its increment()
method.Count
can be presented by appropriately converting the output of computeResult()
.The PrivateCount
class implements an Aggregator
of Count
s.
class PrivateCount[T](epsilon: Double, contribution: Int) extends Aggregator[T, Count, Long] {
override def zero: Count = Count.builder()
.epsilon(epsilon)
.maxPartitionsContributed(contribution)
.build()
override def reduce(b: Count, a: T): Count = {
b.increment()
b
}
override def merge(b1: Count, b2: Count): Count = {
b1.mergeWith(b2.getSerializableSummary)
b1
}
override def finish(reduction: Count): Long = reduction.computeResult().toLong
override def bufferEncoder: Encoder[Count] = Encoders.kryo[Count]
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
Aggregator
s can be registered as udfs. A PrivateCount
can be then performed on
a group of rows like any other built-in aggregate function.
First we instantiate the Aggregator
:
val privateCount = new PrivateCount[Long](epsilon, maxContributions)
Then we register it as a udf
, so that we can use it with DataFrame transformations:
val privateCountUdf = functions.udaf(privateCount)
dataFrame.groupBy("Day").agg(privateCountUdf($"VisitorId") as "private_count").show
Or with the equivalent SparkSQL API:
spark.udf.register("private_count", privateCountUdf)
dataFrame.registerTempTable("visitors")
sql.sql("select Day, private_count(*) from visitors group by Day")
Google's differential-privacy comes with an important caveat: [...] Differential privacy requires some bound on maximum number of contributions each user can make to a single aggregation. The DP building block libraries don't perform such bounding: their implementation assumes that each user contributes only a fixed number of rows to each partition. That number can be configured by the user. The library neither verifies nor enforces this limit; it is the caller's responsibility to pre-process data to enforce this. [...]
We can define a BoundContribution
helper function to preprocess our data and bound partition contributions before
performing the count. Spark 3 higher order functions will come in handy for this step.
object BoundContribution {
/**
* Limit the number of records contributed by `privacyKey`
* to at most `maxContributions` in total, and at most `maxContributionsPerPartition`
* per partition `key`.
*
* @param key governs max contributions per partition.
* @param privacyKey governs max contributions.
* @param contributions max contributions of privacyKey.
* @param max contributions of `privacyKey` per partition `key`.
*
*/
def apply(key: String, privacyKey: String, maxContributions: Int, maxContributionsPerPartition: Int = 1)(dataFrame: DataFrame): DataFrame = {
val byContributionsPerPartition = Window.partitionBy(privacyKey, key)
val byContributions = Window.partitionBy(privacyKey)
val contributionsCol = "contributions"
val contributionsPerPartitionCol = "contributionsPerPartitionCol"
dataFrame
.withColumn(contributionsPerPartitionCol, row_number over byContributionsPerPartition.orderBy(rand))
.where(col(contributionsPerPartitionCol) <= maxContributionsPerPartition)
.withColumn(contributionsCol, row_number over byContributions.orderBy(rand)) // TODO(gmodena): how secure is this?
.where(col(contributionsCol) <= maxContributions)
.drop(contributionsPerPartitionCol, contributionsCol)
}
}
In spark-privacy I implemented some basic private aggregations using a Laplace Mechanism. As an example. The code below reproduces Google's Count visits by hour of day on Spark.
// A distributed implementation of
// https://github.com/google/differential-privacy/tree/main/examples/java#count-visits-by-day-of-week
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions
import org.apache.spark.sql.functions.{col, count}
import privacy.spark.differential.{BoundContribution, PrivateCount}
object DayCount extends App {
// Source dataset at https://raw.githubusercontent.com/google/differential-privacy/main/examples/java/week_data.csv
final private val DayDataCsv = "week_data.csv"
final private val DayDataDelimiter = ","
val spark:SparkSession = SparkSession.builder()
.appName("DayCount")
.getOrCreate()
import spark.implicits._
val epsilon = Math.log(3)
val maxContributions = 3
val maxContributionsPerPartition = 1
val dataFrame = spark.read.option("delimiter", DayDataDelimiter).option("header", "true").csv(DayDataCsv)
val privateCount = new PrivateCount[Long](epsilon, maxContributions)
val privateCountUdf = functions.udaf(privateCount)
// Restrict `maxContributionsPerPartition` of `VisitoryId` per `Day`, and `maxContributions` in total
dataFrame.transform(BoundContribution("Day", "VisitorId", maxContributions, maxContributionsPerPartition))
.groupBy("Day").agg(privateCountUdf($"VisitorId") as "private_count", count($"VisitorId") as "non_private_count" )
.show
https://gitlab.wikimedia.org/gmodena/privacy-spark-pipeline demoes using spark-privacy
in a Spark pipeline, and in a Jupyter (Scala) notebook.