Skip to content
Mar 3

Apache Spark Structured Streaming

MT
Mindli Team

AI-Generated Content

Apache Spark Structured Streaming

In a world where data is generated continuously—from financial transactions and IoT sensors to user clicks and server logs—the ability to process information in real-time is a critical competitive advantage. Apache Spark Structured Streaming provides a unified, scalable, and fault-tolerant engine for building continuous data processing applications, allowing you to leverage the familiar DataFrame API for both batch and streaming workloads. This paradigm simplifies development, as you can apply the same transformations and analyses to static datasets and endless data streams.

Foundational Concepts: The Streaming DataFrame

At its core, Structured Streaming treats a live data stream as an unbounded, continuously appended table. You begin by defining a streaming DataFrame, which represents the input data stream from a source like Kafka, Kinesis, or a file system. This abstraction is powerful because you can apply the same operations—select, filter, groupBy, agg—that you use on static DataFrames. Spark SQL’s Catalyst optimizer generates an execution plan, and the engine automatically increments the processing to handle new data as it arrives.

For example, reading from a Kafka topic is as straightforward as defining a source:

streamingDF = spark \\
  .readStream \\
  .format("kafka") \\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \\
  .option("subscribe", "topic1") \\
  .load()

The resulting streamingDF has the standard Kafka schema (key, value, topic, partition, offset). You can then parse the binary value column into structured data, creating a new DataFrame ready for analysis.

Controlling Output: Sinks and Modes

After transforming your streaming data, you write the results to a destination, or sink, such as a console for debugging, a file store like Delta Lake, or a database. The output mode dictates how the result table is written to the sink each time it is updated, and your choice depends entirely on the type of query you are running.

  • Append Mode: Only new rows added to the result table since the last trigger are written to the sink. This is the default for queries that do not involve aggregation. Think of it as "write-only the new answers."
  • Complete Mode: The entire updated result table is written to the sink every trigger. This is required for aggregation queries where you need the full, recalculated result (e.g., a running count of events).
  • Update Mode: Only the rows in the result table that were updated since the last trigger are written. If a row from a previous trigger is unchanged, it is not rewritten. This mode is efficient for aggregations where results can change incrementally.

For instance, a simple filter query can use outputMode("append"), while a query calculating count() must use outputMode("complete") or outputMode("update").

Handling Late Data with Event-Time and Watermarks

Real-world data often arrives out-of-order. Event-time is the concept of processing data based on the timestamp when the event actually occurred, not when it arrives at Spark. When you perform aggregations on event-time (e.g., count events per 10-minute tumbling window), you need a mechanism to decide when to finalize a window's result and stop waiting for straggling data.

This mechanism is the watermark. You define a watermark delay threshold, such as "10 minutes," on your event-time column. Spark will track the maximum event-time it has seen and allow windows older than (max event-time - watermark delay) to be updated with late data. However, once a window falls behind the watermark, its state is dropped, and it will be considered final. This provides a trade-off: it allows for late data within a reasonable bound while controlling the infinite growth of in-memory state. You declare it like this:

windowedCountsDF = eventsDF \\
  .withWatermark("eventTime", "10 minutes") \\
  .groupBy(window($"eventTime", "5 minutes")) \\
  .count()

Here, a 5-minute tumbling window will be kept open for updates for an additional 10 minutes after its end time before being finalized.

Advanced Operations: Stream-Stream Joins and Triggers

Two powerful advanced features are stream-stream joins and customizable triggers. A stream-stream join allows you to join two real-time data streams, such as correlating ad clicks with ad impressions. For inner joins, you must define watermarks on both input streams to allow Spark to clean old state. For outer joins, watermarks and a time range condition (e.g., events.time BETWEEN otherEvents.time AND otherEvents.time + INTERVAL 1 HOUR) are mandatory to bound the state.

Triggers control the timing of stream processing. While the default trigger runs a micro-batch as soon as the previous one finishes, you can configure fixed intervals (e.g., Trigger.ProcessingTime("30 seconds")) to achieve predictable latency. For true real-time processing with low latency, you can use the continuous processing mode (Trigger.Continuous("1 second")), though it has some limitations in supported operations.

Ensuring Reliability: Checkpointing and State Management

For a production streaming application that must run 24/7, fault tolerance is non-negotiable. Checkpointing is the cornerstone of this reliability. When you enable checkpointing by specifying a directory in your writeStream, Spark persists two types of critical information: 1) The source offsets that track progress in the input data, and 2) The intermediate state of aggregations and joins.

If the application fails and is restarted, it reads the checkpoint data to pick up exactly where it left off, guaranteeing exactly-once processing semantics—each record is processed once and only once, even amid failures. For stateful operations (aggregations, joins, mapGroupsWithState), efficient state management is handled automatically by Spark, but you must always use checkpointing with a reliable sink (like Delta Lake) for end-to-end fault tolerance. Delta Lake itself, as a sink, provides ACID transactions and unified batch/stream processing, making it an excellent choice for building robust pipelines.

Common Pitfalls

  1. Misapplying Output Modes: Attempting to run an aggregation query (e.g., count()) in append mode will throw an error. Correction: Use complete or update mode for aggregating queries. Use append only for simple select/filter/ map-like operations where rows are immutable after being written.
  1. Ignoring Watermarks in Stateful Operations: Running a stateful operation like a stream-stream join without defining watermarks will cause the application's internal state to grow indefinitely, leading to memory errors. Correction: Always define a watermark on event-time columns for stateful operations to allow Spark to prune old state. Choose a watermark delay that balances latency tolerance with state size.
  1. Overlooking Partitioning for Checkpoints: Writing checkpoints to a non-reliable or local file system (like file:// on a single node) defeats the purpose of fault tolerance. Correction: Always use a distributed, fault-tolerant storage system like HDFS or cloud storage (S3, ADLS) for your checkpoint and write-ahead log directories. Ensure your sink (e.g., Delta Lake table) is also on resilient storage.
  1. Confusing Processing-Time with Event-Time: Aggregating based on the time the data is processed, rather than the event's timestamp, leads to incorrect and non-reproducible results, especially when processing backlogs or after a failure. Correction: Structure your data to include an event-time timestamp column and explicitly use this column in windowing operations with window($"eventTime", "duration").

Summary

  • Spark Structured Streaming extends the batch-oriented DataFrame API to infinite data streams, treating a stream as a continuously growing table.
  • Output modes—append, complete, update—govern how query results are written to a sink and are determined by whether your query performs aggregations.
  • Event-time processing combined with watermarks is essential for handling late-arriving data and bounding the internal state of aggregations and stream-stream joins.
  • Triggers allow you to control the latency of your processing, from micro-batch to continuous, while checkpointing is mandatory for achieving fault tolerance and exactly-once processing guarantees in stateful applications.
  • Always design your streaming pipelines with reliability in mind, using watermarks to manage state growth and checkpointing on resilient storage to ensure recovery from failures.

Write better notes with AI

Mindli helps you capture, organize, and master any subject with AI-powered summaries and flashcards.