In the world beyond batch, streaming data processing is a future of dig data. Despite of the streaming framework using for data processing, tight integration with replayable data source like Apache Kafka is often required. The streaming applications often use Apache Kafka as a data source, or as a destination for processing results.
Apache Spark distribution has built-in support for reading from Kafka, but surprisingly does not offer any integration for sending processing result back to Kafka. This blog post aims to fill this gap in the Spark ecosystem.
In the first part of the series you learned how to manage Kafka producer using Scala lazy evaluation feature and how to reuse single Kafka producer instance on Spark executor.
In this blog post you will learn how to publish stream processing results to Apache Kafka in reliable way. First you will learn how Kafka Producer is working, how to configure Kafka producer and how to setup Kafka cluster to achieve desired reliability. In the second part of the blog post, I will present how to implement convenient library for sending continuous sequence of RDDs (DStream) to Apache Kafka topic, as easy as in the code snippet below.
1 2 3 4 5
Quick introduction to Kafka
Kafka is a distributed, partitioned, replicated message broker. Basic architecture knowledge is a prerequisite to understand Spark and Kafka integration challenges. You can safely skip this section, if you are already familiar with Kafka concepts.
For convenience I copied essential terminology definitions directly from Kafka documentation:
- Kafka maintains feeds of messages in categories called topics.
- We’ll call processes that publish messages to a Kafka topic producers.
- We’ll call processes that subscribe to topics and process the feed of published messages consumers.
- Kafka is run as a cluster comprised of one or more servers each of which is called a broker.
So, at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:
This is a bare minimum you have to know but I really encourage you to read Kafka reference manual thoroughly.
Kafka producer API
First we need to know how Kafka producer is working.
Kafka producer exposes very simple API for sending messages to Kafka topics.
The most important methods from
KafkaProducer class are listed below:
1 2 3 4
send() methods asynchronously send a key-value record to a topic and will return immediately once the record has been stored in the buffer of records waiting to be sent.
This kind of API is not very convenient for developers, but is crucial to achieve high throughput and low latency.
If you want to ensure that request has been completed, you can invoke blocking
get() on the future returned by the
The main drawback of calling
get() is a huge performance penalty because it disables batching effectively.
You can not expect high throughput and low latency if the execution is blocked on every message and every single message needs to be sent separately.
Fully non-blocking usage requires use of the callback. The callback will be invoked when the request is complete. Note that callback is executed in Kafka producer I/O thread so should not block the caller, the callback must be as lightweight as possible. The callback must be also properly synchronized due to Java memory model.
If the Kafka producer caller does not check result of the
send() method using future or callback,
it means that if Kafka producer crashed all messages from the internal Kafka producer buffer will be lost.
This is the first, very important element of any integration library with Kafka,
we should expect callback handling to avoid data lost and achieve good performance.
flush() method makes all buffered messages ready to send, and blocks on the completion of the requests associated with these messages.
close() method is like the
flush() method but also closes the producer.
flush() method could be very handy if the Streaming framework wants to ensure that all messages have been sent before processing next part of the stream.
flush() method streaming framework is able to flush the messages to Kafka brokers to simulate commit behaviour.
flush() was added in Kafka 0.9 release (KIP-8).
Before Kafka 0.9, the only safe and straightforward way to flush messages from Kafka producer internal buffer was to close the producer.
If the message must be reliable published on Kafka cluster, Kafka producer and Kafka cluster needs to be configured with care. It needs to be done independently of chosen streaming framework.
Kafka producer buffers messages in memory before sending.
When our memory buffer is exhausted, Kafka producer must either stop accepting new records (block) or throw errors.
By default Kafka producer blocks and this behavior is legitimate for stream processing.
The processing should be delayed if Kafka producer memory buffer is full and could not accept new messages.
block.on.buffer.full Kafka producer configuration property is set.
With default configuration, when Kafka broker (leader of the partition) receive the message, store the message in memory and immediately send acknowledgment to Kafka producer. To avoid data loss the message should be replicated to at least one replica (follower). Only when the follower acknowledges the leader, the leader acknowledges the producer.
This guarantee you will get with
ack=all property in Kafka producer configuration.
This guarantees that the record will not be lost as long as at least one in-sync replica remains alive.
But this is not enough. The minimum number of replicas in-sync must be defined.
You should configure
min.insync.replicas property for every topic.
I recommend to configure at least 2 in-sync replicas (leader and one follower).
If you have datacenter with two zones, I also recommend to keep leader in the first zone and 2 followers in the second zone.
This configuration guarantees that every message will be stored in both zones.
We are almost done with Kafka cluster configuration.
When you set
min.insync.replicas=2 property, the topic should be replicated with factor 2 + N.
Where N is the number of brokers which could fail, and Kafka producer will still be able to publish messages to the cluster.
I recommend to configure replication factor 3 for the topic (or more).
With replication factor 3, the number of brokers in the cluster should be at least 3 + M. When one or more brokers are unavailable, you will get underreplicated partitions state of the topics. With more brokers in the cluster than replication factor, you can reassign underreplicated partitions and achieve fully replicated cluster again. I recommend to build the 4 nodes cluster at least for topics with replication factor 3.
The last important Kafka cluster configuration property is
It should be disabled (by default it is enabled) to avoid unrecoverable exceptions from Kafka consumer.
Consider the situation when the latest committed offset is N,
but after leader failure, the latest offset on the new leader is M < N.
M < N because the new leader was elected from the lagging follower (not in-sync replica).
When the streaming engine ask for data from offset N using Kafka consumer, it will get an exception because the offset N does not exist yet.
Someone will have to fix offsets manually.
So the minimal recommended Kafka setup for reliable message processing is:
- 4 nodes in the cluster
unclean.leader.election.enable=falsein the brokers configuration
- replication factor for the topics – 3
min.insync.replicas=2property in topic configuration
ack=allproperty in the producer configuration
block.on.buffer.full=trueproperty in the producer configuration
With the above setup your configuration should be resistant to single broker failure, and Kafka consumers will survive new leader election.
You could also take look at
for tuning when the follower is removed from ISR by the leader.
But this is out of this blog post scope.
How to expand Spark API?
After this not so short introduction, we are ready to disassembly
integration library for Spark Streaming and Apache Kafka.
DStream needs to be somehow expanded to support new method
In Scala, the only way to add methods to existing API, is to use an implicit conversion feature.
1 2 3 4 5 6 7 8 9
Whenever Scala compiler finds call to non-existing method
the stream will be implicitly wrapped into
sendToKafka is finally defined.
To enable implicit conversion for
DStream add the import statement to your code, that’s all.
How to send to Kafka in reliable way?
Let’s check how
sendToKafka() method is defined step by step, this is the core part of the integration library.
1 2 3 4 5 6 7 8 9
There are two loops, first on wrapped
dstream and second on
rdd for every partition.
Quite standard pattern for Spark programming model.
Records from every partition are ready to be sent to Kafka topic by Spark executors.
The destination topic name is given explicitly as the last parameter of the
First step in the inner loop is getting Kafka producer instance from the
1 2 3
The factory creates only single instance of the producer for any given producer configuration.
If the producer instance has been already created, the existing instance is returned and reused.
Kafka producer caching is crucial for the performance reasons,
because establishing a connection to the cluster takes time.
It is a much more time consuming operation than opening plain socket connection,
as Kafka producer needs to discover leaders for all partitions.
Please refer to first part of this blog post
for more details about the factory implementation.
For debugging purposes logger and Spark task context are needed.
1 2 3 4 5 6 7 8 9 10
You could use any logging framework but the logger itself has to be defined in the foreachPartition loop to avoid weird serialization issues. Spark task context will be used to get current partition identifier. I don’t like static call for getting task context, but this is an official way to do that. See pull request SPARK-5927 for more details.
Before we go further, Kafka producer callback for error handling needs to be introduced.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
onCompletion() of the callback is called when the message sent to the Kafka cluster has been acknowledged.
Exactly one of the callback arguments will be non-null,
KafkaDStreamSinkExceptionHandler class keeps last exception registered by the callback (if any).
The client of the callback is able to rethrow registered exception using
throwExceptionIfAny() methods are called from different threads,
last exception has to be kept in thread-safe data structure
Finally we are ready to send records to Kafka using created callback.
1 2 3 4 5 6 7 8 9 10 11 12 13
First the callback is examined for registered exception.
If one of the previous record could not be sent, the exception is propagated to Spark framework.
If any redelivery policy is needed it should be configured on Kafka producer level.
Look at Kafka producer configuration properties
Finally Kafka producer metadata are collected and materialized by calling
At this moment, Kafka producer starts sending records in background I/O thread.
To achieve high throughput Kafka producer sends records in batches.
Because we want to achieve natural back pressure for our stream processing,
next batch needs to be blocked until records from current batch are really acknowledged by the Kafka brokers.
So for each collected metadata (Java
get() is called to ensure that record has been sent to the brokers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
As long as records sending was started moment ago, it is likelihood that records have been already sent
get() method does not block.
However if the
get() call is blocked, it means that there are unsent messages in the internal Kafka producer buffer
and the processing should be blocked as well.
sendToKafka() method should propagate exception recorded by the callback (if any).
Complete method is presented below for reference.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
The method is not very complex but there are a few important elements if you don’t want to lose processing results and if you need back pressure mechanism:
sendToKafka()should fail fast if record could not be sent to Kafka. Don’t worry Spark will execute failed task again.
sendToKafka()should block Spark processing if Kafka producer slows down.
sendToKafka()should flush records buffered by Kafka producer explicitly, to avoid data loss.
- Kafka producer needs to be reused by Spark executor to avoid connection to Kafka overhead.
- Kafka producer needs to be explicitly closed when Spark shutdowns executors to avoid data loss.
The complete, working project is published on https://github.com/mkuthan/example-spark-kafka. You can clone/fork the project and do some experiments by yourself.
There is also alternative library developed by Cloudera spark-kafka-writer emerged from closed pull request SPARK-2994. Unfortunately at the time of this writing, the library used obsolete Scala Kafka producer API and did not send processing results in reliable way.
I hope that some day we will find reliable, mature library for sending processing result to Apache Kafka in the official Spark distribution.