Passionate Developer

Memory is unreliable like a software, so make my thoughts more eternal and my software more reliable

Long-running Spark Streaming Jobs on YARN Cluster

A long-running Spark Streaming job, once submitted to the YARN cluster should run forever until it is intentionally stopped. Any interruption introduces substantial processing delays and could lead to data loss or duplicates. Neither YARN nor Apache Spark have been designed for executing long-running services. But they have been successfully adapted to growing needs of near real-time processing implemented as long-running jobs. Successfully does not necessarily mean without technological challenges.

This blog post summarizes my experiences in running mission critical, long-running Spark Streaming jobs on a secured YARN cluster. You will learn how to submit Spark Streaming application to a YARN cluster to avoid sleepless nights during on-call hours.

Fault tolerance

In the YARN cluster mode Spark driver runs in the Application Master, the first container allocated by the application. This process is responsible for driving the application and requesting resources (Spark executors) from YARN. What is important, Application Master eliminates need for any another process that run during application lifecycle. Even if an edge Hadoop cluster node where the Spark Streaming job was submitted fails, the application stays unaffected.

To run Spark Streaming application in the cluster mode, ensure that the following parameters are given to spark-submit command:

spark-submit --master yarn --deploy-mode cluster

Because Spark driver and Application Master share a single JVM, any error in Spark driver stops our long-running job. Fortunately it is possible to configure maximum number of attempts that will be made to re-run the application. It is reasonable to set higher value than default 2 (derived from YARN cluster property yarn.resourcemanager.am.max-attempts). For me 4 works quite well, higher value may cause unnecessary restarts even if the reason of the failure is permanent.

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4

If the application runs for days or weeks without restart or redeployment on highly utilized cluster, 4 attempts could be exhausted in few hours. To avoid this situation, the attempt counter should be reset on every hour of so.

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h

Another important setting is a maximum number of executor failures before the application fails. By default it is max(2 * num executors, 3), well suited for batch jobs but not for long-running jobs. The property comes with corresponding validity interval which also should be set.

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h

For long-running jobs you could also consider to boost maximum number of task failures before giving up the job. By default tasks will be retried 4 times and then job fails.

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8

Performance

When a Spark Streaming application is submitted to the cluster, YARN queue where the job runs must be defined. I strongly recommend using YARN Capacity Scheduler and submitting long-running jobs to separate queue. Without a separate YARN queue your long-running job will be preempted by a massive Hive query sooner or later.

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    --queue realtime_queue

Another important issue for Spark Streaming job is keeping processing time stable and highly predictable. Processing time should stay below batch duration to avoid delays. I’ve found that Spark speculative execution helps a lot, especially on a busy cluster. Batch processing times are much more stable than when speculative execution is disabled. Unfortunately speculative mode can be enabled only if Spark actions are idempotent.

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    --queue realtime_queue \
    --conf spark.speculation=true

Security

On a secured HDFS cluster, long-running Spark Streaming jobs fails due to Kerberos ticket expiration. Without additional settings, Kerberos ticket is issued when Spark Streaming job is submitted to the cluster. When ticket expires Spark Streaming job is not able to write or read data from HDFS anymore.

In theory (based on documentation) it should be enough to pass Kerberos principal and keytab as spark-submit command:

spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab

In practice, due to several bugs (HDFS-9276, SPARK-11182) HDFS cache must be disabled. If not, Spark will not be able to read updated token from file on HDFS.

spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab \
     --conf spark.hadoop.fs.hdfs.impl.disable.cache=true

Logging

The easiest way to access Spark application logs is to configure Log4j console appender, wait for application termination and use yarn logs -applicationId [applicationId] command. Unfortunately it is not feasible to terminate long-running Spark Streaming jobs to access the logs.

I recommend to install and configure Elastic, Logstash and Kibana (ELK stack). ELK installation and configuration is out of this blog post scope, but remember to log the following context fields:

  • YARN application id
  • YARN container hostname
  • Executor id (Spark driver is always 000001, Spark executors start from 000002)
  • YARN attempt (to check how many times Spark driver has been restarted, attempts are decreased during application lifecycle accordingly to spark.yarn.am.attemptFailuresValidityInterval)

Log4j configuration with Logstash specific appender and layout definition should be passed to spark-submit command:

spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab \
     --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
     --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --files /path/to/log4j.properties

Finally Kibana dashboard for Spark Job might look like:

Monitoring

Long running job runs 24/7 so it is important to have an insight into historical metrics. Again, external tools are needed. I recommend to install Graphite for collecting metrics and Grafana for building dashboards.

First, Spark needs to be configured to report metrics into Graphite, prepare the metrics.properties file:

*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=[hostname]
*.sink.graphite.port=[port]
*.sink.graphite.prefix=stats.analytics // this prefix will be used later on

driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

And configure spark-submit command:

spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab \
     --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
     --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --files /path/to/log4j.properties:/path/to/metrics.properties

Spark publishes tons of metrics from driver and executors. If I were to choose the most important one, it would be the last received batch records. When StreamingMetrics.streaming.lastReceivedBatch_records == 0 it probably means that Spark Streaming job has been stopped or failed.

Other important metrics are listed below:

  • When total delay is greater than batch interval, latency of the processing pipeline increases.
1
driver.StreamingMetrics.streaming.lastCompletedBatch_totalDelay
  • When number of active tasks is lower than number of executors * number of cores, allocated resources are not fully utilized.
1
executor.threadpool.activeTasks
  • How much RAM is used for RDD cache.
1
driver.BlockManager.memory.memUsed_MB
  • When there is not enough RAM for RDD cache, how much data has been spilled to disk. You should increase executor memory or change spark.memory.fraction Spark property to avoid performance degradation.
1
driver.BlockManager.disk.diskSpaceUsed_MB

`

  • What is JVM memory utilization on Spark driver.
1
2
3
4
5
driver.jvm.heap.used
driver.jvm.non-heap.used
driver.jvm.pools.G1-Old-Gen.used
driver.jvm.pools.G1-Eden-Space.used
driver.jvm.pools.G1-Survivor-Space.used
  • How much time is spent on GC on Spark driver.
1
2
driver.jvm.G1-Old-Generation.time
driver.jvm.G1-Young-Generation.time
  • What is JMV memory utilization on Spark executors.
1
2
3
4
5
[0-9]*.jvm.heap.used
[0-9]*.jvm.non-heap.used
[0-9]*.jvm.pools.G1-Old-Gen.used
[0-9]*.jvm.pools.G1-Survivor-Space.used
[0-9]*.jvm.pools.G1-Eden-Space.used
  • How much time is spent on GC on Spark executors.
1
2
[0-9]*.jvm.G1-Old-Generation.time
[0-9]*.jvm.G1-Young-Generation.time

While you configure first Grafana dashboard for Spark application, the first problem pops up:

How to configure Graphite query when metrics for every Spark application run are reported under its own application id?

For driver metrics use wildcard .*(application_[0-9]+).* and aliasSub Graphite function to present ‘application id’ as graph legend:

aliasSub(stats.analytics.$job_name.*.prod.$dc.*.driver.jvm.heap.used, ".*(application_[0-9]+).*", "heap: \1")

For executor metrics again use wildcard .*(application_[0-9]+).*, groupByNode Graphite function to sum metrics from all Spark executors and aliasSub Graphite function to present ‘application id’ as graph legend:

aliasSub(groupByNode(stats.analytics.$job_name.*.prod.$dc.*.[0-9]*.jvm.heap.used, 6, "sumSeries"), "(.*)", "heap: \1")

Finally Grafana dashboard for Spark Job might look like:

If Spark application is restarted frequently, metrics for old, already finished runs should be deleted from Graphite. Because Graphite does not compact inactive metrics, old metrics slow down Graphite itself and Grafana queries.

Graceful stop

The last puzzle element is how to stop Spark Streaming application deployed on YARN in a graceful way. The standard method for stopping (or rather killing) YARN application is using a command yarn application -kill [applicationId]. And this command stops the Spark Streaming application but this could happen in the middle of a batch. So if the job reads data from Kafka, saves processing results on HDFS and finally commits Kafka offsets you should expect duplicated data on HDFS when job was stopped just before committing offsets.

The first attempt to solve graceful shutdown issue was to call Spark streaming context stop method in shutdown hook.

1
2
3
sys.addShutdownHook {
    streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}

Disappointingly a shutdown hook is called too late to finish started batch and Spark application is killed almost immediately. Moreover there is no guarantee that a shutdown hook will be called by JVM at all.

At the time of writing this blog post the only confirmed way to shutdown gracefully Spark Streaming application on YARN is to notifying somehow the application about planned shutdown, and then stop streaming context programmatically (but not from shutdown hook). Command yarn application -kill should be used only as a last resort if notified application did not stop after defined timeout.

The application can be notified about planned shutdown using marker file on HDFS (the easiest way), or using simple Socket/HTTP endpoint exposed on the driver (sophisticated way).

Because I like KISS principle, below you can find shell script pseudo-code for starting / stopping Spark Streaming application using marker file:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
start() {
    hdfs dfs -touchz /path/to/marker/file
    spark-submit ...
}

stop() {
    hdfs dfs -rm /path/to/marker/file
    force_kill=true
    application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
    for i in `seq 1 10`; do
        application_status=$(yarn application -status ${application_id} 2>&1 | grep "State : \(RUNNING\|ACCEPTED\)")
        if [ -n "$application_status" ]; then
            sleep 60s
        else
            force_kill=false
            break
        fi
    done
    $force_kill && yarn application -kill ${application_id}
}

In the Spark Streaming application, background thread should monitor /path/to/marker/file file, and when the file disappears stop the context calling streamingContext.stop(stopSparkContext = true, stopGracefully = true).

Summary

As you could see, configuration for mission critical Spark Streaming application deployed on YARN is quite complex. It has been long, tedious and iterative learning process of all presented techniques by a few very smart devs. But at the end, long-running Spark Streaming applications deployed on highly utilized YARN cluster are extraordinarily stable.

Spark Application Assembly for Cluster Deployments

When I tried to deploy my first Spark application on a YARN cluster, I realized that there was no clear and concise instruction how to prepare the application for deployment. This blog post could be treated as missing manual on how to build Spark application written in Scala to get deployable binary.

This blog post assumes that your Spark application is built with SBT. As long as SBT is a mainstream tool for building Scala applications the assumption seems legit. Please ensure that your project is configured with at least SBT 0.13.6. Open project/build.properties file, verify the version and update SBT if needed:

1
sbt.version=0.13.11

SBT Assembly Plugin

The spark-submit script is a convenient way to launch Spark application on the YARN or Mesos cluster. However, due to distributed nature of the cluster the application has to be prepared as single Java ARchive (JAR). This archive includes all classes from your project with all of its dependencies. This application assembly can be prepared using SBT Assembly Plugin.

To enable SBT Assembly Plugin, add the plugin dependency to the project/plugins.sbt file:

1
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

This basic setup can be verified by calling sbt assembly command. The final assembly location depend on the Scala version, application name and application version. The build result could be assembled into target/scala-2.11/myapp-assembly-1.0.jar file.

You can configure many aspects of SBT Assembly Plugin like custom merge strategy but I found that it is much easier to keep the defaults and follow the plugin conventions. And what is even more important you don’t have to change defaults to get correct, deployable application binary assembled by the plugin.

Provided dependencies scope

As long as cluster provides Spark classes at runtime, Spark dependencies must be excluded from the assembled JAR. If not, you should expect weird errors from Java classloader during application startup. Additional benefit of assembly without Spark dependencies is faster deployment. Please remember that application assembly must be copied over the network to the location accessible by all cluster nodes (e.g: HDFS or S3).

Look at dependency section in your build file, it should look similar to the code snippet below:

1
2
3
4
5
6
7
8
9
10
val sparkVersion = "1.6.0"

"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion,
"org.apache.spark" %% "spark-mlib" % sparkVersion,
"org.apache.spark" %% "spark-graphx" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion,
(...)

The list of the Spark dependencies is always project specific. SQL, Hive, MLib, GraphX and Streaming extensions are defined only for reference. All defined dependencies are required by local build to compile code and run tests. So they could not be removed from the build definition in the ordinary way because it will break the build at all.

SBT Assembly Plugin comes with additional dependency scope “provided”. The scope is very similar to Maven provided scope. The provided dependency will be part of compilation and test, but excluded from the application assembly.

To configure provided scope for Spark dependencies change the definition as follows:

1
2
3
4
5
6
7
8
9
10
11
12
val sparkVersion = "1.6.0"

"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"org.apache.spark" %% "spark-mlib" % sparkVersion % "provided",
"org.apache.spark" %% "spark-graphx" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion
  exclude("log4j", "log4j")
  exclude("org.spark-project.spark", "unused"),
(...)

Careful readers should notice that “spark-streaming-kafka” dependency has not been listed and marked as “provided”. It was done by purpose because integration with Kafka is not part of Spark distribution assembly and has to be assembled into application JAR. The exclusion rules for “spark-streaming-kafka” dependency will be discussed later.

Ok, but how to recognize which libraries are part of Spark distribution assembly? There is no simple answer to this question. Look for spark-assembly-*-1.6.0.jar file on the cluster classpath, list the assembly content and verify what is included and what is not. In the assembly on my cluster I found core, sql, hive, mlib, graphx and streaming classes are embedded but not integration with Kafka.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ tar -tzf spark-assembly-1.6.0.jar
META-INF/
META-INF/MANIFEST.MF
org/
org/apache/
org/apache/spark/
org/apache/spark/HeartbeatReceiver
(...)
org/apache/spark/ml/
org/apache/spark/ml/Pipeline$SharedReadWrite$$anonfun$2.class
org/apache/spark/ml/tuning/
(...)
org/apache/spark/sql/
org/apache/spark/sql/UDFRegistration$$anonfun$3.class
org/apache/spark/sql/SQLContext$$anonfun$range$2.class
(...)
reference.conf
META-INF/NOTICE

SBT run and run-main

Provided dependency scope unfortunately breaks SBT run and run-main tasks. Because provided dependencies are excluded from the runtime classpath, you should expect ClassNotFoundException during application startup on local machine. To fix this issue, provided dependencies must be explicitly added to all SBT tasks used for local run, e.g.:

1
2
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run)))
runMain in Compile <<= Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run)))

How to exclude Log4j from application assembly?

Without Spark classes the application assembly is quite lightweight. But the assembly size might be reduced event more!

Let assume that your application requires some logging provider. As long as Spark internally uses Log4j, it means that Log4j is already on the cluster classpath. But you may say that there is much better API for Scala than origin Log4j – and you are totally right.

The snippet below configure excellent Typesafe (Lightbend nowadays) Scala Logging Library dependency.

1
2
3
4
5
6
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",

"org.slf4j" % "slf4j-api" % "1.7.10",
"org.slf4j" % "slf4j-log4j12" % "1.7.10" exclude("log4j", "log4j"),

"log4j" % "log4j" % "1.2.17" % "provided",

Scala Logging is a thin wrapper for SLF4J implemented using Scala macros. The “slf4j-log4j12” is a binding library between SLF4J API and Log4j logger provider. Three layers of indirection but who cares :–)

There is also top-level dependency to Log4J defined with provided scope. But this is not enough to get rid of Log4j classes from the application assembly. Because Log4j is also a transitive dependency of “slf4j-log4j12” it must be explicitly excluded. If not, SBT Assembly Plugin adds Log4j classes to the assembly even if top level “log4j” dependency is marked as “provided”. Not very intuitive but SBT Assembly Plugin works this way.

Alternatively you could disable transitive dependencies for “slf4j-log4j12” at all. It could be especially useful for libraries with many transitive dependencies which are expected to be on the cluster classpath.

1
"org.slf4j" % "slf4j-log4j12" % "1.7.10" intransitive()

Spark Streaming Kafka dependency

Now we are ready to define dependency to “spark-streaming-kafka”. Because Spark integration with Kafka typically is not a part of Spark assembly, it must be embedded into application assembly. The artifact should not be defined within “provided” scope.

1
2
3
4
5
6
7
val sparkVersion = "1.6.0"

(...)
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion
  exclude("log4j", "log4j")
  exclude("org.spark-project.spark", "unused"),
(...)

Again, “log4j” transitive dependency of Kafka needs to be explicitly excluded. I also found that marker class from weird Spark “unused” artifact breaks default SBT Assembly Plugin merge strategy. It is much easier to exclude this dependency than customize merge strategy of the plugin.

Where is Guava?

When you look at your project dependencies you could easily find Guava (version 14.0.1 for Spark 1.6.0). Ok, Guava is an excellent library so you decide to use the library in your application.

WRONG!

Guava is on the classpath during compilation and tests but at runtime you will get “ClassNotFoundException” or method not found error. First, Guava is shaded in Spark distribution assembly under org/spark-project/guava package and should not be used directly. Second, there is a huge chance for outdated Guava library on the cluster classpath. In CDH 5.3 distribution, the installed Guava version is 11.0.2 released on Feb 22, 2012 – more than 4 years ago! Since the Guava is binary compatible only between 2 or 3 latest major releases it is a real blocker.

There are experimental configuration flags for Spark spark.driver.userClassPathFirst and spark.executor.userClassPathFirst. In theory it gives user-added jars precedence over Spark’s own jars when loading classes in the the driver. But in practice it does not work, at least for me :–(.

In general you should avoid external dependencies at all cost when you develop application deployed on the YARN cluster. Classloader hell is even bigger than in JEE containers like JBoss or WebLogic. Look for the libraries with minimal transitive dependencies and narrowed features. For example, if you need a cache, choose Caffeine over Guava.

Deployment optimization for YARN cluster

When application is deployed on YARN cluster using spark-submit script, the script upload Spark distribution assembly to the cluster during every deployment. The distribution assembly size is over 100MB, ten times more than typical application assembly!

So I really recommend to install Spark distribution assembly on well known location on the cluster and define spark.yarn.jar property for spark-submit. The assembly will not be copied over the network during every deployment.

1
spark.yarn.jar=hdfs:///apps/spark/assembly/spark-assembly-1.6.0.jar

Summary

I witnessed a few Spark projects where build.sbt were more complex than application itself. And application assembly was bloated with unnecessary 3rd party classes and deployment process took ages. Build configuration described in this blog post should help you deploy Spark application on the cluster smoothly and still keep SBT configuration easy to maintain.

Spark and Kafka Integration Patterns, Part 2

In the world beyond batch, streaming data processing is a future of dig data. Despite of the streaming framework using for data processing, tight integration with replayable data source like Apache Kafka is often required. The streaming applications often use Apache Kafka as a data source, or as a destination for processing results.

Apache Spark distribution has built-in support for reading from Kafka, but surprisingly does not offer any integration for sending processing result back to Kafka. This blog post aims to fill this gap in the Spark ecosystem.

In the first part of the series you learned how to manage Kafka producer using Scala lazy evaluation feature and how to reuse single Kafka producer instance on Spark executor.

In this blog post you will learn how to publish stream processing results to Apache Kafka in reliable way. First you will learn how Kafka Producer is working, how to configure Kafka producer and how to setup Kafka cluster to achieve desired reliability. In the second part of the blog post, I will present how to implement convenient library for sending continuous sequence of RDDs (DStream) to Apache Kafka topic, as easy as in the code snippet below.

1
2
3
4
5
// enable implicit conversions
import KafkaDStreamSink._

// send dstream to Kafka
dstream.sendToKafka(kafkaProducerConfig, topic)

Quick introduction to Kafka

Kafka is a distributed, partitioned, replicated message broker. Basic architecture knowledge is a prerequisite to understand Spark and Kafka integration challenges. You can safely skip this section, if you are already familiar with Kafka concepts.

For convenience I copied essential terminology definitions directly from Kafka documentation:

  • Kafka maintains feeds of messages in categories called topics.
  • We’ll call processes that publish messages to a Kafka topic producers.
  • We’ll call processes that subscribe to topics and process the feed of published messages consumers.
  • Kafka is run as a cluster comprised of one or more servers each of which is called a broker.

So, at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:

This is a bare minimum you have to know but I really encourage you to read Kafka reference manual thoroughly.

Kafka producer API

First we need to know how Kafka producer is working. Kafka producer exposes very simple API for sending messages to Kafka topics. The most important methods from KafkaProducer class are listed below:

KafkaProducer API
1
2
3
4
j.u.c.Future<RecordMetadata> send(ProducerRecord<K,V> record)
j.u.c.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
void flush()
void close()

The send() methods asynchronously send a key-value record to a topic and will return immediately once the record has been stored in the buffer of records waiting to be sent. This kind of API is not very convenient for developers, but is crucial to achieve high throughput and low latency.

If you want to ensure that request has been completed, you can invoke blocking get() on the future returned by the send() methods. The main drawback of calling get() is a huge performance penalty because it disables batching effectively. You can not expect high throughput and low latency if the execution is blocked on every message and every single message needs to be sent separately.

Fully non-blocking usage requires use of the callback. The callback will be invoked when the request is complete. Note that callback is executed in Kafka producer I/O thread so should not block the caller, the callback must be as lightweight as possible. The callback must be also properly synchronized due to Java memory model.

If the Kafka producer caller does not check result of the send() method using future or callback, it means that if Kafka producer crashed all messages from the internal Kafka producer buffer will be lost. This is the first, very important element of any integration library with Kafka, we should expect callback handling to avoid data lost and achieve good performance.

The flush() method makes all buffered messages ready to send, and blocks on the completion of the requests associated with these messages. The close() method is like the flush() method but also closes the producer.

The flush() method could be very handy if the Streaming framework wants to ensure that all messages have been sent before processing next part of the stream. With flush() method streaming framework is able to flush the messages to Kafka brokers to simulate commit behaviour.

Method flush() was added in Kafka 0.9 release (KIP-8). Before Kafka 0.9, the only safe and straightforward way to flush messages from Kafka producer internal buffer was to close the producer.

Kafka configuration

If the message must be reliable published on Kafka cluster, Kafka producer and Kafka cluster needs to be configured with care. It needs to be done independently of chosen streaming framework.

Kafka producer buffers messages in memory before sending. When our memory buffer is exhausted, Kafka producer must either stop accepting new records (block) or throw errors. By default Kafka producer blocks and this behavior is legitimate for stream processing. The processing should be delayed if Kafka producer memory buffer is full and could not accept new messages. Ensure that block.on.buffer.full Kafka producer configuration property is set.

With default configuration, when Kafka broker (leader of the partition) receive the message, store the message in memory and immediately send acknowledgment to Kafka producer. To avoid data loss the message should be replicated to at least one replica (follower). Only when the follower acknowledges the leader, the leader acknowledges the producer.

This guarantee you will get with ack=all property in Kafka producer configuration. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive.

But this is not enough. The minimum number of replicas in-sync must be defined. You should configure min.insync.replicas property for every topic. I recommend to configure at least 2 in-sync replicas (leader and one follower). If you have datacenter with two zones, I also recommend to keep leader in the first zone and 2 followers in the second zone. This configuration guarantees that every message will be stored in both zones.

We are almost done with Kafka cluster configuration. When you set min.insync.replicas=2 property, the topic should be replicated with factor 2 + N. Where N is the number of brokers which could fail, and Kafka producer will still be able to publish messages to the cluster. I recommend to configure replication factor 3 for the topic (or more).

With replication factor 3, the number of brokers in the cluster should be at least 3 + M. When one or more brokers are unavailable, you will get underreplicated partitions state of the topics. With more brokers in the cluster than replication factor, you can reassign underreplicated partitions and achieve fully replicated cluster again. I recommend to build the 4 nodes cluster at least for topics with replication factor 3.

The last important Kafka cluster configuration property is unclean.leader.election.enable. It should be disabled (by default it is enabled) to avoid unrecoverable exceptions from Kafka consumer. Consider the situation when the latest committed offset is N, but after leader failure, the latest offset on the new leader is M < N. M < N because the new leader was elected from the lagging follower (not in-sync replica). When the streaming engine ask for data from offset N using Kafka consumer, it will get an exception because the offset N does not exist yet. Someone will have to fix offsets manually.

So the minimal recommended Kafka setup for reliable message processing is:

  • 4 nodes in the cluster
  • unclean.leader.election.enable=false in the brokers configuration
  • replication factor for the topics – 3
  • min.insync.replicas=2 property in topic configuration
  • ack=all property in the producer configuration
  • block.on.buffer.full=true property in the producer configuration

With the above setup your configuration should be resistant to single broker failure, and Kafka consumers will survive new leader election.

You could also take look at replica.lag.max.messages and replica.lag.time.max.ms properties for tuning when the follower is removed from ISR by the leader. But this is out of this blog post scope.

How to expand Spark API?

After this not so short introduction, we are ready to disassembly integration library for Spark Streaming and Apache Kafka. First DStream needs to be somehow expanded to support new method sendToKafka().

1
dstream.sendToKafka(kafkaProducerConfig, topic)

In Scala, the only way to add methods to existing API, is to use an implicit conversion feature.

1
2
3
4
5
6
7
8
9
object KafkaDStreamSink {

  import scala.language.implicitConversions

  implicit def createKafkaDStreamSink(dstream: DStream[KafkaPayload]): KafkaDStreamSink = {
    new KafkaDStreamSink(dstream)
  }

}

Whenever Scala compiler finds call to non-existing method sendToKafka() on DStream class, the stream will be implicitly wrapped into KafkaDStreamSink class, where method sendToKafka is finally defined. To enable implicit conversion for DStream add the import statement to your code, that’s all.

1
import KafkaDStreamSink._

How to send to Kafka in reliable way?

Let’s check how sendToKafka() method is defined step by step, this is the core part of the integration library.

1
2
3
4
5
6
7
8
9
class KafkaDStreamSink(dstream: DStream[KafkaPayload]) {

  def sendToKafka(config: Map[String, String], topic: String): Unit = {
    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { records =>
        // send records from every partition to Kafka
      }
    }
  }

There are two loops, first on wrapped dstream and second on rdd for every partition. Quite standard pattern for Spark programming model. Records from every partition are ready to be sent to Kafka topic by Spark executors. The destination topic name is given explicitly as the last parameter of the sendToKafka() method.

First step in the inner loop is getting Kafka producer instance from the KafkaProducerFactory.

1
2
3
rdd.foreachPartition { records =>
  val producer = KafkaProducerFactory.getOrCreateProducer(config)
  (...)

The factory creates only single instance of the producer for any given producer configuration. If the producer instance has been already created, the existing instance is returned and reused. Kafka producer caching is crucial for the performance reasons, because establishing a connection to the cluster takes time. It is a much more time consuming operation than opening plain socket connection, as Kafka producer needs to discover leaders for all partitions. Please refer to first part of this blog post and KafkaProducerFactory source for more details about the factory implementation.

For debugging purposes logger and Spark task context are needed.

1
2
3
4
5
6
7
8
9
10
import org.apache.spark.TaskContext
import org.slf4j.LoggerFactory
(...)

rdd.foreachPartition { records =>
  val producer = KafkaProducerFactory.getOrCreateProducer(config)

  val context = TaskContext.get
  val logger = Logger(LoggerFactory.getLogger(classOf[KafkaDStreamSink]))
  (...)

You could use any logging framework but the logger itself has to be defined in the foreachPartition loop to avoid weird serialization issues. Spark task context will be used to get current partition identifier. I don’t like static call for getting task context, but this is an official way to do that. See pull request SPARK-5927 for more details.

Before we go further, Kafka producer callback for error handling needs to be introduced.

KafkaDStreamSinkExceptionHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.atomic.AtomicReference
import org.apache.kafka.clients.producer.Callback

class KafkaDStreamSinkExceptionHandler extends Callback {

  private val lastException = new AtomicReference[Option[Exception]](None)

  override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
    lastException.set(Option(exception))

  def throwExceptionIfAny(): Unit =
    lastException.getAndSet(None).foreach(ex => throw ex)

}

Method onCompletion() of the callback is called when the message sent to the Kafka cluster has been acknowledged. Exactly one of the callback arguments will be non-null, metadata or exception. KafkaDStreamSinkExceptionHandler class keeps last exception registered by the callback (if any). The client of the callback is able to rethrow registered exception using throwExceptionIfAny() method. Because onCompletion() and throwExceptionIfAny() methods are called from different threads, last exception has to be kept in thread-safe data structure AtomicReference.

Finally we are ready to send records to Kafka using created callback.

1
2
3
4
5
6
7
8
9
10
11
12
13
rdd.foreachPartition { records =>
  val producer = KafkaProducerFactory.getOrCreateProducer(config)

  val context = TaskContext.get
  val logger = Logger(LoggerFactory.getLogger(classOf[KafkaDStreamSink]))

  val callback = new KafkaDStreamSinkExceptionHandler

  logger.debug(s"Send Spark partition: ${context.partitionId} to Kafka topic: $topic")
  val metadata = records.map { record =>
    callback.throwExceptionIfAny()
    producer.send(new ProducerRecord(topic, record.key.orNull, record.value), callback)
  }.toList

First the callback is examined for registered exception. If one of the previous record could not be sent, the exception is propagated to Spark framework. If any redelivery policy is needed it should be configured on Kafka producer level. Look at Kafka producer configuration properties retries and retry.backoff.ms. Finally Kafka producer metadata are collected and materialized by calling toList() method. At this moment, Kafka producer starts sending records in background I/O thread. To achieve high throughput Kafka producer sends records in batches.

Because we want to achieve natural back pressure for our stream processing, next batch needs to be blocked until records from current batch are really acknowledged by the Kafka brokers. So for each collected metadata (Java j.u.c.Future), method get() is called to ensure that record has been sent to the brokers.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
rdd.foreachPartition { records =>
  val producer = KafkaProducerFactory.getOrCreateProducer(config)

  val context = TaskContext.get
  val logger = Logger(LoggerFactory.getLogger(classOf[KafkaDStreamSink]))

  val callback = new KafkaDStreamSinkExceptionHandler

  logger.debug(s"Send Spark partition: ${context.partitionId} to Kafka topic: $topic")
  val metadata = records.map { record =>
    callback.throwExceptionIfAny()
    producer.send(new ProducerRecord(topic, record.key.orNull, record.value), callback)
  }.toList

  logger.debug(s"Flush Spark partition: ${context.partitionId} to Kafka topic: $topic")
  metadata.foreach { metadata => metadata.get() }

As long as records sending was started moment ago, it is likelihood that records have been already sent and get() method does not block. However if the get() call is blocked, it means that there are unsent messages in the internal Kafka producer buffer and the processing should be blocked as well.

Finally sendToKafka() method should propagate exception recorded by the callback (if any). Complete method is presented below for reference.

sendToKafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def sendToKafka(config: Map[String, String], topic: String): Unit = {
  dstream.foreachRDD { rdd =>
    rdd.foreachPartition { records =>
      val producer = KafkaProducerFactory.getOrCreateProducer(config)

      val context = TaskContext.get
      val logger = Logger(LoggerFactory.getLogger(classOf[KafkaDStreamSink]))

      val callback = new KafkaDStreamSinkExceptionHandler

      logger.debug(s"Send Spark partition: ${context.partitionId} to Kafka topic: $topic")
      val metadata = records.map { record =>
        callback.throwExceptionIfAny()
        producer.send(new ProducerRecord(topic, record.key.orNull, record.value), callback)
      }.toList

      logger.debug(s"Flush Spark partition: ${context.partitionId} to Kafka topic: $topic")
      metadata.foreach { metadata => metadata.get() }

      callback.throwExceptionIfAny()
    }
  }
}

The method is not very complex but there are a few important elements if you don’t want to lose processing results and if you need back pressure mechanism:

  • Method sendToKafka() should fail fast if record could not be sent to Kafka. Don’t worry Spark will execute failed task again.
  • Method sendToKafka() should block Spark processing if Kafka producer slows down.
  • Method sendToKafka() should flush records buffered by Kafka producer explicitly, to avoid data loss.
  • Kafka producer needs to be reused by Spark executor to avoid connection to Kafka overhead.
  • Kafka producer needs to be explicitly closed when Spark shutdowns executors to avoid data loss.

Summary

The complete, working project is published on https://github.com/mkuthan/example-spark-kafka. You can clone/fork the project and do some experiments by yourself.

There is also alternative library developed by Cloudera spark-kafka-writer emerged from closed pull request SPARK-2994. Unfortunately at the time of this writing, the library used obsolete Scala Kafka producer API and did not send processing results in reliable way.

I hope that some day we will find reliable, mature library for sending processing result to Apache Kafka in the official Spark distribution.

Spark and Kafka Integration Patterns, Part 1

I published post on the allegro.tech blog, how to integrate Spark Streaming and Kafka. In the blog post you will find how to avoid java.io.NotSerializableException exception when Kafka producer is used for publishing results of the Spark Streaming processing.

http://allegro.tech/spark-kafka-integration.html

You could be also interested in the following part of this blog post where I presented complete library for sending Spark Streaming processing results to Kafka.

Happy reading :–)

Spark and Spark Streaming Unit Testing

When you develop distributed system, it is crucial to make it easy to test. Execute tests in controlled environment, ideally from your IDE. Long develop-test-develop cycle for complex systems could kill your productivity. Below you find my testing strategy for Spark and Spark Streaming applications.

Unit or integration tests, that is the question

Our hypothetical Spark application pulls data from Apache Kafka, apply transformations using RDDs and DStreams and persist outcomes into Cassandra or Elastic Search database. On production Spark application is deployed on YARN or Mesos cluster, and everything is glued with ZooKeeper. Big picture of the stream processing architecture is presented below:

Lots of moving parts, not so easy to configure and test. Even with automated provisioning implemented with Vagrant, Docker and Ansible. If you can’t test everything, test at least the most important part of your application – transformations – implemented with Spark.

Spark claims, that it is friendly to unit testing with any popular unit test framework. To be strict, Spark supports rather lightweight integration testing, not unit testing, IMHO. But still it is much more convenient to test transformation logic locally, than deploying all parts on YARN.

There is a pull request SPARK-1751 that adds “unit tests” support for Apache Kafka streams. Should we follow that way? Embedded ZooKeeper and embedded Apache Kafka are needed, the test fixture is complex and cumbersome. Perhaps tests would be fragile and hard to maintain. This approach makes sense for Spark core team, they want to test Spark and Kafka integration.

What should be tested?

Our transformation logic implemented with Spark, nothing more. But how to test the logic so tightly coupled to Spark API (RDD, DStream)? Let’s define how typical Spark application is organized. Our hypothetical application structure looks like this:

  1. Initialize SparkContext or StreamingContext.
  2. Create RDD or DStream for given source (e.g: Apache Kafka)
  3. Evaluate transformations on RDD or DStream API.
  4. Put transformation outcomes (e.g: aggregations) into external database.

Context

SparkContext and StreamingContext could be easily initialized for testing purposes. Set master URL to local, run the operations and then stop context gracefully.

SparkContext Initialization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class SparkExampleSpec extends FlatSpec with BeforeAndAfter {

  private val master = "local[2]"
  private val appName = "example-spark"

  private var sc: SparkContext = _

  before {
    val conf = new SparkConf()
      .setMaster(master)
      .setAppName(appName)

    sc = new SparkContext(conf)
  }

  after {
    if (sc != null) {
      sc.stop()
    }
  }
  (...)
StreamingContext Initialization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class SparkStreamingExampleSpec extends FlatSpec with BeforeAndAfter {

  private val master = "local[2]"
  private val appName = "example-spark-streaming"
  private val batchDuration = Seconds(1)
  private val checkpointDir = Files.createTempDirectory(appName).toString

  private var sc: SparkContext = _
  private var ssc: StreamingContext = _

  before {
    val conf = new SparkConf()
      .setMaster(master)
      .setAppName(appName)

    ssc = new StreamingContext(conf, batchDuration)
    ssc.checkpoint(checkpointDir)

    sc = ssc.sparkContext
  }

  after {
    if (ssc != null) {
      ssc.stop()
    }
  }

  (...)

RDD and DStream

The problematic part is how to create RDD or DStream. For testing purposes it must be simplified to avoid embedded Kafka and ZooKeeper. Below you can find examples how to create in-memory RDD and DStream.

In-memory RDD
1
2
val lines = Seq("To be or not to be.", "That is the question.")
val rdd = sparkContext.parallelize(lines)
In-memory DStream
1
2
3
4
5
val lines = mutable.Queue[RDD[String]]()
val dstream = streamingContext.queueStream(lines)

// append data to DStream
lines += sparkContext.makeRDD(Seq("To be or not to be.", "That is the question."))

Transformation logic

The most important part of our application – transformations logic – must be encapsulated in separate class or object. Object is preferred to avoid class serialization overhead. Exactly the same code is used by the application and by the test.

WordCount.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
case class WordCount(word: String, count: Int)

object WordCount {
  def count(lines: RDD[String], stopWords: Set[String]): RDD[WordCount] = {
    val words = lines.flatMap(_.split("\\s"))
      .map(_.strip(",").strip(".").toLowerCase)
      .filter(!stopWords.contains(_)).filter(!_.isEmpty)

    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _).map {
      case (word: String, count: Int) => WordCount(word, count)
    }

    val sortedWordCounts = wordCounts.sortBy(_.word)

    sortedWordCounts
  }
}

Spark test

Now it is time to implement our first test for WordCount transformation. The code of test is very straightforward and easy to read. Single point of truth, the best documentation of your system, always up-to-date.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
"Shakespeare most famous quote" should "be counted" in {
    Given("quote")
    val lines = Array("To be or not to be.", "That is the question.")

    Given("stop words")
    val stopWords = Set("the")

    When("count words")
    val wordCounts = WordCount.count(sc.parallelize(lines), stopWords).collect()

    Then("words counted")
    wordCounts should equal(Array(
      WordCount("be", 2),
      WordCount("is", 1),
      WordCount("not", 1),
      WordCount("or", 1),
      WordCount("question", 1),
      WordCount("that", 1),
      WordCount("to", 2)))
  }

Spark Streaming test

Spark Streaming transformations are much more complex to test. The full control over clock is needed to manually manage batches, slides and windows. Without controlled clock you would end up with complex tests with many Thread.sleeep calls. And the test execution would take ages. The only downside is that you will not have extra time for coffee during tests execution.

Spark Streaming provides necessary abstraction over system clock, ManualClock class. Unfortunately ManualClock class is declared as package private. Some hack is needed. The wrapper presented below, is an adapter for the original ManualClock class but without access restriction.

ClockWrapper.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package org.apache.spark.streaming

import org.apache.spark.streaming.util.ManualClock

class ClockWrapper(ssc: StreamingContext) {

  def getTimeMillis(): Long = manualClock().currentTime()

  def setTime(timeToSet: Long) = manualClock().setTime(timeToSet)

  def advance(timeToAdd: Long) = manualClock().addToTime(timeToAdd)

  def waitTillTime(targetTime: Long): Long = manualClock().waitTillTime(targetTime)

  private def manualClock(): ManualClock = {
    ssc.scheduler.clock.asInstanceOf[ManualClock]
  }

}

Now Spark Streaming test can be implemented in efficient way. The test does not have to wait for system clock and test is implemented with millisecond precision. You can easily test your windowed scenario from the very beginning to very end. With given\when\then structure you should be able to understand tested logic without further explanations.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
"Sample set" should "be counted" in {
  Given("streaming context is initialized")
  val lines = mutable.Queue[RDD[String]]()

  var results = ListBuffer.empty[Array[WordCount]]

  WordCount.count(ssc.queueStream(lines), windowDuration, slideDuration) { (wordsCount: RDD[WordCount], time: Time) =>
    results += wordsCount.collect()
  }

  ssc.start()

  When("first set of words queued")
  lines += sc.makeRDD(Seq("a", "b"))

  Then("words counted after first slide")
  clock.advance(slideDuration.milliseconds)
  eventually(timeout(1 second)) {
    results.last should equal(Array(
      WordCount("a", 1),
      WordCount("b", 1)))
  }

  When("second set of words queued")
  lines += sc.makeRDD(Seq("b", "c"))

  Then("words counted after second slide")
  clock.advance(slideDuration.milliseconds)
  eventually(timeout(1 second)) {
    results.last should equal(Array(
      WordCount("a", 1),
      WordCount("b", 2),
      WordCount("c", 1)))
  }

  When("nothing more queued")

  Then("word counted after third slide")
  clock.advance(slideDuration.milliseconds)
  eventually(timeout(1 second)) {
    results.last should equal(Array(
      WordCount("a", 0),
      WordCount("b", 1),
      WordCount("c", 1)))
  }

  When("nothing more queued")

  Then("word counted after fourth slide")
  clock.advance(slideDuration.milliseconds)
  eventually(timeout(1 second)) {
    results.last should equal(Array(
      WordCount("a", 0),
      WordCount("b", 0),
      WordCount("c", 0)))
  }
}

One comment to Eventually trait usage. The trait is needed because Spark Streaming is a multithreaded application, and results are not computed immediately. I found that 1 second timeout is enough for Spark Streaming to calculate the results. The timeout is not related to batch, slide or window duration.

Summary

The complete, working project is published on GitHub. You can clone/fork the project and do some experiments by yourself.

I hope that Spark committers expose ManualClock for others, eventually. Control of time is necessary for efficient Spark Streaming application testing.

How to Learn DDD

Books

Domain Driven Design by Eric Evans.

You have to read this book, period. From the very beginning to very end. Do not stop reading after first part of the book, the part about strategic design is much more important. Study this book again and again. I did not read this book at once, it would be impossible mission. Every time I back to this book I found something new, every single word in this book is important and brings some meaning.

Implementing Domain Driven Design by Vaughn Vernon.

More practical and easier to digest book than previous one. Not so brilliant but still worth reading.

Exploring CQRS and Event Sourcing

Excellent DDD/CQRS case study with working code on GitHub. Real world example how to define bounded context and how to integrate them using domain events. Awesome!

Enterprise Patterns and MDA by Jim Arlow and Ila Neusandt

Do not reinvent the wheel when you discover your domain model. At least for e-commerce :–) Apply presented archetype patterns wisely and save your ass.

My examples

http://mkuthan.github.io/presentations/ddd.html – “Domain Driven Desing – from trenches for practitioners” presentation.

http://mkuthan.github.io/blog/2013/11/04/ddd-architecture-summary/ – Blog post – my DDD check list.

https://github.com/mkuthan/example-ddd-cqrs-server – Experiment based on Vernon book.

https://github.com/mkuthan/example-axon – Experiment based on Exploring CQRS and Event Sourcing book.

Other sources

http://www.udidahan.com/ – Udi Dahan – one of my mentor in distributed systems and DDD architecture.

https://groups.yahoo.com/neo/groups/domaindrivendesign – official DDD discussion group, addictive reading for long winter evenings.

Programming Language Does Not Matter

A few days ago I participated in quick presentation of significant e-commerce platform. The custom platform implemented mostly in PHP and designed as scalable and distributed system. And I was really impressed! Below you can find a short summary of chosen libraries, frameworks and tools.

Symfony – The leading PHP framework to create web applications. Very similar to Spring Framework, you will get dependency injection, layered architecture and good support for automated testing.

Doctrine – Object to relational mapper (ORM), part of the Symfony framework. Very similar to JPA.

Composer – Dependency manager for PHP, very similar to NPM.

Gearman – Generic application framework to farm out work to other machines or processes. Somehow similar to YARN.

Varnish – HTTP reverse proxy.

Memcached – Distributed key value cache.

RabbitMQ – Messaging middleware based on AMPQ protocol. Used for distributed services integration but also for decoupled request reply communication.

logstash – Log manager with tons of plugins to almost everything. The monitoring is really crucial in distributed systems.

The programming language does not really matter if you need scalable, distributed, easy to maintain and enhance system. You can apply excellent design using PHP or produce big ball of mud in Java.

Mastering Node.js - Book Review

Overview

I’m really impressed by Node.js (and JavaScript) ecosystem. I took Mastering Node.js book to understand Node.js philosophy and compare to JVM world.

V8

JavaScript virtual machine, conceptually very similar to JVM. The most important element of JavaScript ecosystem if you want to do something more than client side web application. I really like Node.js REPL, experimentation is as easy as with Scala.

Event loop

Elegant simulation of concurrency. Do you remember Swing event dispatch thread and invokeLater() method? Event loop is the same. It is crucial to understand events handling order:

  • emitted event
  • timers
  • IO callbacks
  • deferred execution blocks

Event driven concurrency

Process is a first class citizen. The easiest (and cheapest) way to achieve concurrency with horizontal scalability.

Real-time applications

I enhanced drawing board presented in the book. It was great fun together with my 2 years old son :–) Scalable server side implementation is presented below, I could not even imagine Java version.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var express = require('express')
var path = require('path');
var app = express();
var http = require('http').Server(app);
var io = require('socket.io')(http);
var redis = require('socket.io-redis');

io.adapter(redis({ host: 'localhost', port: 6379 }));

var port = parseInt(process.argv[2]);

app.use(express.static(path.join(__dirname, 'assets')));

app.get('/', function(req, res){
  res.sendfile('index.html');
});

io.on('connection', function (socket) {
  socket.on('move', function (data) {
    socket.broadcast.emit('moving', data);
  });
});

http.listen(port, function(){
  console.log('Board started on: ' + port);
});

Keep in mind that SSE is unidirectional from server to clients and requires funny 2KB padding. I didn’t know that before.

Scaling on single node

Spawning, forking child processes is easy, communication between parent and children processes is easy as well. Cluster module simplifies web application implementation for multi-core processors and it is very easy to understand and control.

Horizontal scaling

Keep shared state in horizontally scalable store, e.g: session data in Redis or RabbitMq for events.

Apache Bench

Command line tool for load/stress testing. Using JMeter or Gatling is not always the only way to perform simple test.

UDP / Multicast

Good to know the world behind HTTP/REST/SOAP … There is a lot of important layers between application and wire, do you remember OSI?

AWS

I have to practice using S3 or DynamoDB eventually.

Node debugger

OMG – I used to debug application using console 10 years ago or so ;–)

Express, Socket.io, Path

Implementing web application using Node.js only is feasible but with Express it is much easier.

Be aware that there are thousands of web frameworks for Node.js on the market. Much more that for Java 10 years ago ;–) It seems that frameworks built around WebSocket and Single Page App should be the leaders.

Interesing resources

Comparing the Performance of Web Server Architectures

Broken Promises

Summary

JavaScript and Node.js seem to be one of the most vital ecosystem for web development. The adoption in the enterprise world is still low but I really like this ecosystem and its community. And I’m still waiting for final version of ES6, sometimes JavaScript really sucks.

SOA Patterns - Book Review

Overview

I took this book from my bookshelf when I was preparing internal presentation about micro services for my Roche colleagues. I was mainly interested in Saga and Composite Front End patterns. But when I started, I decided to read rest of the book.

Patterns

Below you can find my short summary about every pattern described in the book:

Service Host

Every service needs the host where it works. For me Spring Framework is excellent example of the service host.

Active Service

Very similar to Micro Services concept, when the service should be autonomous.

Transactional Service

I know a few alternative names of this pattern: Unit of Work, Open Session in View. In JEE world implemented using ThreadLocal.

Workflodize

Strange pattern name. I don’t really like complexity of workflow engines and prefer simple object oriented finite state machine implementation.

Edge Component

Separate infrastructure code from domain. Just simple like that.

Decoupled Invocation

Use event / command bus for communication.

Parallel Pipelines

Apply Unix philosophy to your services. SRP on the higher level.

Gridable Service

Horizontal scaling.

Service Instance

Horizonatal scaling.

Virtual Endpoint

Make your deployment configuration flexible.

Service Watchdog

Service monitoring should be built-in.

Secured Message

Encrypt what should be secured on the message level (privacy, integrity, impersonation).

Secured Infrastructure

Encrypt what should be secured on the protocol level (privacy, integrity, impersonation).

Service Firewall

Security on the network level. Expose only what is really needed.

Identity Provider

Single Sign On.

Service Monitor

Monitoring on the business process level.

Request/Reply

Synchronous point to point communication.

Request/Reaction

Asynchronous point to point communication.

Inversion of Communications

Command Bus, Event Bus, messaging middleware in general. Complex Event Processing (CEP).

Saga

Long running business transactions. Distributed transactions without XA.

Reservation

Related to Saga, how to avoid XA transactions.

Composite Front End

How to compose services into single web application? Author does not answer my doubts in this chapter.

Client/Server/Service

How to deal with legacy systems. How to move from monolithic architecture to SOA.

Service Bus

Message Bus, Service Bus, ESB – nice explanation.

Orchestration

Externalize business long running processes. But still encapsulate business logic in services not in the orchestrator!

Aggregated Reporting

Looks like CQRS for me.

Antipatterns

Funny names for real problems when SOA is used:

  • Knot – problems with coupling.
  • Nanoservice – problems with bounded contexts.
  • Transactional Integration – problems with XA transations.
  • Same Old Way – problems with CRUD like services.

Summary

For sure it’s worth reading but I expected more from Arnon Rotem-Gal-Oz. Sometimes I felt that author covers only the top of the iceberg, when demons are under the hood. The sample code fragments are not very helpful, with high accidental complexity but do not clearly show the problem.

In addition the book was published in 2012 but you will easily realized that author had started ten years before, some parts seems to be outdated.

Rrelease It! - Book Review

Recently I read excellent book Release It! written by Michael Nygard. The book is 7 years old and I don’t know how I could miss the book until now.

Michael Nygard shows how to design and architect medium or large scale web applications. Real lessons learnt from the trenches not golden rules from ivory architects.

This blog post is a dump of taken notes when I was reading the book. The list could be used as a checklist for system architects and developers. There is no particular order of the notes, perhaps there are duplications too.

  • admin access – should use separate networks than regular traffic, if not administrator will not be able connect to the system when something is wrong.

  • network timeouts – should be always defined, if not our system could hang if there is a problem with remote service.

  • firewall – be aware of timeouts on firewall connection tracking tables, if the connection is unused for long time (e.g connection from the pool), firewall could drop packets silently.

  • failure probability – are dependant, not like during coin toss.

  • 3rd party vendors – their client library often sucks, you can not define timeouts, you can not configure threading correctly.

  • method wait – always provide the timeout, do not use method Object.wait().

  • massive email with deep links – do not send massive emails with deep links, bunch of requests to single resource could kill your application.

  • threads ratio – check front-end and back-end threads ratio, the system is as fast as its slowest part.

  • SLA – define different SLAs for different subsystems, not everything must have 99.99%

  • high CPU utilization – check GC logs first.

  • JVM crash – typical after OOM, when native code is trying to allocate memory – malloc() returns error but only few programmers handle this error.

  • Collection size – do not use unbounded collections, huge data set kills your application eventually.

  • Outgoing communication – define timeouts.

  • Incoming communication – fail fast, be pleasant for other systems.

  • separate threads pool – for admin access, your last way to fix the system.

  • input validation – fail fast, use JS validation even if validation must be duplicated.

  • circuit braker – design pattern for handling unavailable remote services.

  • handshake in protocol – alternative for circuit braker if you desing your own protocol.

  • test harness – test using production like environment (but how to do that???)

  • capacity – always multiply by number of users, requests, etc.

  • safety limits on everything – nice general rule.

  • oracle and connection pool – Oracle in default configuration spawns separate process for every connection, check how much memory is used only for handling client connections.

  • unbalanced resources – underestimated part will fail first, and it could hang whole system.

  • JSP and GC – be aware of noclassgc JVM option, compiled JSP files use perm gen space.

  • http sessions – users do not understand the concept, do not keep shopping card in the session :–)

  • whitespaces – remove any unnecessary whitespace from the pages, in large scale it saves a lot of traffic.

  • avoid hand crafted SQLs – hard to predict the outcome, and hard to optimize for performance.

  • database tests – use the real data volume.

  • unicast – could be used for up to ~10 servers, for bigger cluster use multicast.

  • cache – always limit cache size.

  • hit ratio – always monitor cache hit ratio.

  • precompute html – huge server resource saver, not everything changes on every request.

  • JVM tuning – is application release specific, on every release memory utilization could be different.

  • multihomed servers – on production network topology is much more complex.

  • bonding – single network configured with multiple network cards and multiple switch ports.

  • backup – use separate network, backup always consumes your whole bandwidth.

  • virtual IP – always configure virtual IP, your configuration will be much more flexible.

  • technical accounts – do not share accounts between services, it would be security flaws.

  • cluster configuration verification – periodically check configuration on the cluster nodes, even if the configuration is deployed automatically.

  • separate configuration specific for the single cluster node – keep node specific configuration separated from shared configuration.

  • configuration property names – based on function not nature (e.g: hostname is too generic).

  • graceful shutdown – do not terminate existing business transations.

  • thread dumps – prepare scripts for that, during accident time is really precious (SLAs).

  • recovery oriented computing – be prepared for restarting only part of the system, restarting everything is time consuming.

  • transparency – be able to monitor everything.

  • monitoring policy, alerts – should not be defined by the service, configure the policies outside (perhaps in central place).

  • log format – should be human readable, humans are the best in pattern matching, use tabulators and fixed width columns.

  • CIMSNMP superior.

  • SSL accelerator – what it really is???

  • OpsDB monitoring – measurements and expectations, end to end business process monitoring.

  • Node Identifiers – assign to teams in block.

  • Observe, Orient, Decide, Act – military methodology, somehow similar to Agile :–)

  • review – tickets, stack traces in log files, volume of problems, data volumes, query statistics periodically.

  • DB migration – expansion phase for incompatible schema changes.