nowave.it

Async I/O with PyFlink

Tue 07 May 2024

This article contains notes on the implementation of asynchronous callbacks in PyFlink. The work is derived from a collaboration with Wikimedia's Platform team and TNG, and further developed in a hosted Flink solution.

Code snippets in this article have not been tested. They should be treated more like pseduo code than as a reference.

Introduction

When developing streaming applications, we sometimes need asynchronous access to external data sources. For instance, we might want to perform a lookup join on a database or query a microservice.

At the time of writing, Async I/O in Apache Flink is only supported by the Java API. Making an external call in a Python application results in a synchronous, blocking operation.

This article describes an approach to asynchronous computation in PyFlink, based on mini-batching. Events are processed concurrently with a thread pool local to a KeyedProcessFunction operator.

Mini batching with count + time window triggers

Mini batching can be implemented using a window function and an operator that consumes all elements of the window and yields back an arbitrary number of records. An approach that worked well in practice is to use a window governed by a "count with time" trigger. A batch is considered complete when either one of these conditions are met:

  1. The window has processed a number of records equal to a batch_size parameter.

  2. A time interval of length batch_duration has elapsed.

Adding a time bound avoids a batch staying open indefinitely. This would be the case when the record count is low (e.g. if data distribution in a partitioned DataStream is skewed).

Timers

Flink provides a timer mechanism, that can be registered by an operator. A ProcessFunction can use timers to trigger actions at specific times. Timers can be either processing time timers or event time timers, which are triggered by watermarks.

When the specified processing time or the event-time watermark reaches the specified event time, , ProcessFunction.on_timer is triggered.

KeyedProcessFunction is a commonly used function to work with timers.

Implementation

A straightforward implementation can subclass KeyedProcessFunction , initialize a thread pool, and implement window triggering logic in on_timer and process_element methods. In the reminder of this article we'll do to the following:

  1. We use a ThreadPoolExecutor from concurrent.futures to launch parallel tasks.
  2. A KeyProcessFunction.process_element implementation consumes incoming records, and submits futures to the executor thread pool. Once batch_size records have been consumed, futures are collected and the current window is closed. When process_element is invoked and no future has been scheduled, a timer is registered in the Flink Timer Registry. The timer will trigger at current_time + batch_duration (expressed in milliseconds).
  3. KeyProcessFunction.on_timer is triggered when the window timeout is reached and the timer registered in process_element fires. Any pending future is collected and the current window is closed. 1

A basic implementation would look like the following:

def __init__(self, max_worker=1):
    self.max_worker = max_worker
    self.executor: ThreadPoolExecutor
    # timestamp that determines when a timer will be triggered, ending the window.
    self.trigger_timestamp = 0

    # batch_futures keeps track of futures scheduled
    # during a window lifetime.
    self.batch_futures: List[Future] = []

def open(self):
    self.executor = ThreadPoolExecutor(max_workers=self.max_workers)

def on_timer(
    self,
    timestamp: int,
    ctx: "KeyedProcessFunction.OnTimerContext"): 
    # collect elements
    yield from self.collect_batch()

    # when the timer fires, reset the batch.
    self.batch_futures = []

def process_element(
    self,
    event: Dict[Any, Any],
    ctx: KeyedProcessFunction.Context,
    ) -> Generator[Dict[Any, Any], None, None]:
    # batch_futures is empty, meaning we are starting a new batch here.
    # start a timer for this batch.
    if not self.batch_futures:
        self.trigger_timestamp = (
            ctx.timer_service().current_processing_time() + self.batch_duration_ms
        )
        ctx.timer_service().register_processing_time_timer(self.trigger_timestamp)

     # Run func in an executor thread, and add the (future, input event) tuple to the batch.
     self.batch_futures.append((self.executor.submit(self.func, event), event))

     if len(self.batch_futures) >= self.batch_size:
        # The batch is full. Cancel the outstanding batch timer,
        # collect and emit the results.
        ctx.timer_service().delete_processing_time_timer(self.trigger_timestamp)
        yield from self.collect_batch()
        # close the count window
        self.batch_futures = []

def collect_batch():
    for future, event in self.batch_futures:
        try:
            result = future.result()
            yield result
        except Exception as e:
            logger.error(e)

Delivery guarantees

This implementation would not be able to satisfy EXACTLY_ONCE delivery guarantees. Upon application restarts, events processed since the last checkpoint would be re-processed.

To ensure EXACTLY_ONCE (for the Flink side of a streaming application), we can keep a list of events that have already been processed in the current window in a ListState. The ListState is checkpointed consistently by Flink as part of the its distributed snapshots implementation. While this will mitigate duplicate event processing (e.g. calls to api), the guarantee holds within a single window firing and not between windows firing: when resuming from a checkpoint (e.g. recovery), source offsets (and list state) will rewind to the latest known checkpoint.

A basic implementation would look like the following:

def open(self):
        # Keep a list of events that have already been processed in the current window,
        # to mitigate duplicate event processing (e.g. calls to api) in case of application
        # restarts. This is guaranteed within a single window firing
        # and not between windows firing: when resuming from a checkpoint (e.g. recovery)
        # source offsets (and list state) will rewind to the latest known checkpoint.
        # The list state is check pointed consistently by the system as part of the
        # distributed snapshots.
        # The ListState could grow large in size. For book-keeping, we clean() it once all events in a batch
        # have been processed.
        event_type_info = Types.PICKLED_BYTE_ARRAY()
        self.processed_events_state_descriptor = ListStateDescriptor(
            name="list", elem_type_info=event_type_info
        )
        self.processed_events_state: ListState = runtime_ctx.get_list_state(
            self.processed_events_state_descriptor
        )

def process_element(
    self,
    event: Dict[Any, Any],
    ctx: "KeyedProcessFunction.Context",
    ) -> Generator[Dict[Any, Any], None, None]:
        # batch_futures is empty, meaning we are starting a new batch here.
        # start a timer for this batch.
        if not self.batch_futures:
            self.trigger_timestamp = ctx.timer_service().current_processing_time() + self.batch_duration_ms
            ctx.timer_service().register_processing_time_timer(self.trigger_timestamp)
    # Run func in an executor thread, and add the (future, input event)
    # tuple to the batch
     processed_events = self.processed_events_state.get()
     if not processed_events or event not in processed_events:
        self.batch_futures.append((self.executor.submit(self.func, event)))

     if len(self.batch_futures) >= self.batch_size:
        # The batch is full. Cancel the outstanding batch timer
        # and collect and emit the results.
        ctx.timer_service().delete_processing_time_timer(self.trigger_timestamp)

        yield from self.collect_batch()
        # close the count window
        self.batch_futures = []
        # All events have been processed
        self.processed_events_state.clear()
def collect_batch():
    # Iterate over the completed futures according to their insertion order.
    # `batch_futures` now contains both done and uncompleted futures returned
    # by wait().
    for future, event in self.batch_futures:
        try:
            result = future.result()
            # handle errors
            yield result
        except Exception as e:
            # handle errors
            logger.error(e)
        finally:
            self.processed_events_state.add(event)  # mark the event as processed

Conclusion

This article showed an approach to execute async functions on a Flink DataStream by introducing ad-hoc windowing logic and a local thread pool.

References


  1. we should consider extracting this logic out of a ProcessFunction and move it to a Trigger