Fri 28 October 2022

The Java implementation of Google's differential-privacy provides building blocks for computing \((\epsilon, \delta)\) differentially private statistics over datasets.

CountVisitsPerDay gives an example of how to anonymously count daily restaurant visitors on each day of a week (using a Count object). The relevant logic:

```
EnumMap<DayOfWeek, Integer> privateCountsPerDay = new EnumMap<>(DayOfWeek.class);
...
Arrays.stream(DayOfWeek.values()).forEach(d -> {
Count dpCount =
Count.builder()
.epsilon(LN_3)
// The data was pre-processed so that each visitor may visit the restaurant up to
// MAX_VISIT_DAYS days per week.
// Hence, each user may contribute to up to MAX_VISIT_DAYS daily counts.
// Note: while the library accepts this limit as a configurable parameter,
// it doesn't pre-process the data to ensure this limit is respected.
// It is responsibility of the caller to ensure the data passed to the library
// is capped for getting the correct privacy guarantee.
.maxPartitionsContributed(MAX_CONTRIBUTED_DAYS)
.build();
dpCount.incrementBy(boundedVisits.getVisitsForDay(d).size());
privateCountsPerDay.put(d, (int) dpCount.computeResult());
});
```

For each day of the week, a `dpCount`

private counter is incremented by the amount of visitors of that day. The daily
statistic is computed and cached in a hashtable.

Thanks to algebraic properties of their implementation, `Count`

and other statistics are suitable for map/reduce type of computations. In the reminder of this article we'll look at scaling up
differential privacy on Apache Spark.

Spark 3 introduced a public `Aggregator`

base class for efficient User Defined Aggregate functions
(UDAFs). See User Defined Aggregation in Apache Spark: A Love Story
for performance considerations. `Aggregator`

borrows from algebird. It operates on an aggregating monoid type, and adds some accumulation and presentation logic. It supports both typed and untyped DataFrames.

An `Aggregator[IN, BUF, OUT]`

class requires input `IN`

, aggregating accumulator `BUF`

(to store intermediate results)
and output `OUT`

type parameters.

In Scala it must implement the following methods:

`zero()`

: the identity element of the aggregating monoid.`merge(BUF b1, BUF b2)`

: combines two accumulators into a new accumulator.`reduce(BUF b, IN a)`

: updates the accumulator with an`IN`

value.`finish(BUF reduction)`

: presents the output`OUT`

of a reduced result.

`Count`

s are `Aggregator[IN, BUF, OUT]`

s:

- The
`IN`

type is any`Numeric`

. - The
`OUT`

type can be a`Long`

. - The
`BUF`

type is a`Count`

itself. - The identity element for
`Count`

is a newly instantiated, empty,`Count`

object. - Two
`Counts`

can be merged via their`mergeWith()`

method. - A
`Count`

can be updated via its`increment()`

method. - The result of a
`Count`

can be presented by appropriately converting the output of`computeResult()`

.

The `PrivateCount`

class implements an `Aggregator`

of `Count`

s.

```
class PrivateCount[T](epsilon: Double, contribution: Int) extends Aggregator[T, Count, Long] {
override def zero: Count = Count.builder()
.epsilon(epsilon)
.maxPartitionsContributed(contribution)
.build()
override def reduce(b: Count, a: T): Count = {
b.increment()
b
}
override def merge(b1: Count, b2: Count): Count = {
b1.mergeWith(b2.getSerializableSummary)
b1
}
override def finish(reduction: Count): Long = reduction.computeResult().toLong
override def bufferEncoder: Encoder[Count] = Encoders.kryo[Count]
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
```

`Aggregator`

s can be registered as udfs. A `PrivateCount`

can be then performed on
a group of rows like any other built-in aggregate function.

First we instantiate the `Aggregator`

:

```
val privateCount = new PrivateCount[Long](epsilon, maxContributions)
```

Then we register it as a `udf`

, so that we can use it with DataFrame transformations:

```
val privateCountUdf = functions.udaf(privateCount)
dataFrame.groupBy("Day").agg(privateCountUdf($"VisitorId") as "private_count").show
```

Or with the equivalent SparkSQL API:

```
spark.udf.register("private_count", privateCountUdf)
dataFrame.registerTempTable("visitors")
sql.sql("select Day, private_count(*) from visitors group by Day")
```

Google's differential-privacy comes with an important caveat:
*[...] Differential privacy requires some bound on maximum number of contributions each user can make to a single
aggregation. The DP building block libraries don't perform such bounding: their implementation assumes that each user
contributes only a fixed number of rows to each partition. That number can be configured by the user. The library
neither verifies nor enforces this limit; it is the caller's responsibility to pre-process data to enforce this. [...]*

We can define a `BoundContribution`

helper function to preprocess our data and bound partition contributions before
performing the count. Spark 3 higher order functions will come in handy for this step.

```
object BoundContribution {
/**
* Limit the number of records contributed by `privacyKey`
* to at most `maxContributions` in total, and at most `maxContributionsPerPartition`
* per partition `key`.
*
* @param key governs max contributions per partition.
* @param privacyKey governs max contributions.
* @param contributions max contributions of privacyKey.
* @param max contributions of `privacyKey` per partition `key`.
*
*/
def apply(key: String, privacyKey: String, maxContributions: Int, maxContributionsPerPartition: Int = 1)(dataFrame: DataFrame): DataFrame = {
val byContributionsPerPartition = Window.partitionBy(privacyKey, key)
val byContributions = Window.partitionBy(privacyKey)
val contributionsCol = "contributions"
val contributionsPerPartitionCol = "contributionsPerPartitionCol"
dataFrame
.withColumn(contributionsPerPartitionCol, row_number over byContributionsPerPartition.orderBy(rand))
.where(col(contributionsPerPartitionCol) <= maxContributionsPerPartition)
.withColumn(contributionsCol, row_number over byContributions.orderBy(rand)) // TODO(gmodena): how secure is this?
.where(col(contributionsCol) <= maxContributions)
.drop(contributionsPerPartitionCol, contributionsCol)
}
}
```

In spark-privacy I implemented some basic private aggregations using a Laplace Mechanism. As an example. The code below reproduces Google's Count visits by hour of day on Spark.

```
// A distributed implementation of
// https://github.com/google/differential-privacy/tree/main/examples/java#count-visits-by-day-of-week
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions
import org.apache.spark.sql.functions.{col, count}
import privacy.spark.differential.{BoundContribution, PrivateCount}
object DayCount extends App {
// Source dataset at https://raw.githubusercontent.com/google/differential-privacy/main/examples/java/week_data.csv
final private val DayDataCsv = "week_data.csv"
final private val DayDataDelimiter = ","
val spark:SparkSession = SparkSession.builder()
.appName("DayCount")
.getOrCreate()
import spark.implicits._
val epsilon = Math.log(3)
val maxContributions = 3
val maxContributionsPerPartition = 1
val dataFrame = spark.read.option("delimiter", DayDataDelimiter).option("header", "true").csv(DayDataCsv)
val privateCount = new PrivateCount[Long](epsilon, maxContributions)
val privateCountUdf = functions.udaf(privateCount)
// Restrict `maxContributionsPerPartition` of `VisitoryId` per `Day`, and `maxContributions` in total
dataFrame.transform(BoundContribution("Day", "VisitorId", maxContributions, maxContributionsPerPartition))
.groupBy("Day").agg(privateCountUdf($"VisitorId") as "private_count", count($"VisitorId") as "non_private_count" )
.show
```

https://gitlab.wikimedia.org/gmodena/privacy-spark-pipeline demoes using `spark-privacy`

in a Spark pipeline, and in a Jupyter (Scala) notebook.