nowave.it

Differential Privacy with Apache Spark

Fri 28 October 2022

The Java implementation of Google's differential-privacy provides building blocks for computing \((\epsilon, \delta)\) differentially private statistics over datasets.

CountVisitsPerDay gives an example of how to anonymously count daily restaurant visitors on each day of a week (using a Count object). The relevant logic:

EnumMap<DayOfWeek, Integer> privateCountsPerDay = new EnumMap<>(DayOfWeek.class);
...
Arrays.stream(DayOfWeek.values()).forEach(d -> {
  Count dpCount =
      Count.builder()
          .epsilon(LN_3)
          // The data was pre-processed so that each visitor may visit the restaurant up to
          // MAX_VISIT_DAYS days per week.
          // Hence, each user may contribute to up to MAX_VISIT_DAYS daily counts.
          // Note: while the library accepts this limit as a configurable parameter,
          // it doesn't pre-process the data to ensure this limit is respected.
          // It is responsibility of the caller to ensure the data passed to the library
          // is capped for getting the correct privacy guarantee.
          .maxPartitionsContributed(MAX_CONTRIBUTED_DAYS)
          .build();
  dpCount.incrementBy(boundedVisits.getVisitsForDay(d).size());
  privateCountsPerDay.put(d, (int) dpCount.computeResult());
});

For each day of the week, a dpCount private counter is incremented by the amount of visitors of that day. The daily statistic is computed and cached in a hashtable.

Thanks to algebraic properties of their implementation, Count and other statistics are suitable for map/reduce type of computations. In the reminder of this article we'll look at scaling up differential privacy on Apache Spark.

User Defined Aggregations with Apache Spark

Spark 3 introduced a public Aggregator base class for efficient User Defined Aggregate functions (UDAFs). See User Defined Aggregation in Apache Spark: A Love Story for performance considerations. Aggregator borrows from algebird. It operates on an aggregating monoid type, and adds some accumulation and presentation logic. It supports both typed and untyped DataFrames.

An Aggregator[IN, BUF, OUT] class requires input IN, aggregating accumulator BUF (to store intermediate results) and output OUT type parameters.

In Scala it must implement the following methods:

The Count class is an Aggregator

Counts are Aggregator[IN, BUF, OUT]s:

The PrivateCount class implements an Aggregator of Counts.

class PrivateCount[T](epsilon: Double, contribution: Int) extends Aggregator[T, Count, Long] {
  override def zero: Count = Count.builder()
    .epsilon(epsilon)
    .maxPartitionsContributed(contribution)
    .build()

  override def reduce(b: Count, a: T): Count = {
    b.increment()
    b
  }

  override def merge(b1: Count, b2: Count): Count = {
    b1.mergeWith(b2.getSerializableSummary)
    b1
  }

  override def finish(reduction: Count): Long = reduction.computeResult().toLong

  override def bufferEncoder: Encoder[Count] = Encoders.kryo[Count]

  override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}

Aggregators can be registered as udfs. A PrivateCount can be then performed on a group of rows like any other built-in aggregate function.

First we instantiate the Aggregator:

val privateCount = new PrivateCount[Long](epsilon, maxContributions)                                              

Then we register it as a udf, so that we can use it with DataFrame transformations:

val privateCountUdf = functions.udaf(privateCount)
dataFrame.groupBy("Day").agg(privateCountUdf($"VisitorId") as "private_count").show 

Or with the equivalent SparkSQL API:

spark.udf.register("private_count", privateCountUdf)
dataFrame.registerTempTable("visitors")
sql.sql("select Day, private_count(*) from visitors group by Day")

Contribution bounding

Google's differential-privacy comes with an important caveat: [...] Differential privacy requires some bound on maximum number of contributions each user can make to a single aggregation. The DP building block libraries don't perform such bounding: their implementation assumes that each user contributes only a fixed number of rows to each partition. That number can be configured by the user. The library neither verifies nor enforces this limit; it is the caller's responsibility to pre-process data to enforce this. [...]

We can define a BoundContribution helper function to preprocess our data and bound partition contributions before performing the count. Spark 3 higher order functions will come in handy for this step.

object BoundContribution {
  /**
   * Limit the number of records contributed by `privacyKey`
   * to at most `maxContributions` in total, and at most `maxContributionsPerPartition`
   * per partition `key`.
   *
   * @param key governs max contributions per partition.
   * @param privacyKey governs max contributions.
   * @param contributions max contributions of privacyKey.
   * @param max contributions of `privacyKey` per partition `key`.
   *
  */
  def apply(key: String, privacyKey: String, maxContributions: Int, maxContributionsPerPartition: Int = 1)(dataFrame: DataFrame): DataFrame =  {
    val byContributionsPerPartition = Window.partitionBy(privacyKey, key)
    val byContributions = Window.partitionBy(privacyKey)
    val contributionsCol = "contributions"
    val contributionsPerPartitionCol = "contributionsPerPartitionCol"

    dataFrame
      .withColumn(contributionsPerPartitionCol, row_number over byContributionsPerPartition.orderBy(rand))
      .where(col(contributionsPerPartitionCol) <= maxContributionsPerPartition)
      .withColumn(contributionsCol, row_number over byContributions.orderBy(rand)) // TODO(gmodena): how secure is this?
      .where(col(contributionsCol) <= maxContributions)
      .drop(contributionsPerPartitionCol, contributionsCol)
  }
}

Count visits by hour of day

In spark-privacy I implemented some basic private aggregations using a Laplace Mechanism. As an example. The code below reproduces Google's Count visits by hour of day on Spark.

// A distributed implementation of
// https://github.com/google/differential-privacy/tree/main/examples/java#count-visits-by-day-of-week

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions
import org.apache.spark.sql.functions.{col, count}
import privacy.spark.differential.{BoundContribution, PrivateCount}

object DayCount extends App {
  // Source dataset at https://raw.githubusercontent.com/google/differential-privacy/main/examples/java/week_data.csv
  final private val DayDataCsv = "week_data.csv"
  final private val DayDataDelimiter = ","

  val spark:SparkSession = SparkSession.builder()
    .appName("DayCount")
    .getOrCreate()
  import spark.implicits._

  val epsilon = Math.log(3)
  val maxContributions = 3
  val maxContributionsPerPartition = 1

  val dataFrame = spark.read.option("delimiter", DayDataDelimiter).option("header", "true").csv(DayDataCsv)

  val privateCount = new PrivateCount[Long](epsilon, maxContributions)
  val privateCountUdf = functions.udaf(privateCount)

  // Restrict `maxContributionsPerPartition` of `VisitoryId` per `Day`, and `maxContributions` in total
  dataFrame.transform(BoundContribution("Day", "VisitorId", maxContributions, maxContributionsPerPartition))
    .groupBy("Day").agg(privateCountUdf($"VisitorId") as "private_count", count($"VisitorId") as "non_private_count" )
    .show

Privacy pipelines

https://gitlab.wikimedia.org/gmodena/privacy-spark-pipeline demoes using spark-privacy in a Spark pipeline, and in a Jupyter (Scala) notebook.

References

  1. User Defined Aggregation in Apache Spark: A Love Story.
  2. Course notes on differential privacy.