Passionate Developer

Memory is unreliable like a software, so make my thoughts more eternal and my software more reliable

Kafka Streams DSL vs Processor API

Kafka Streams is a Java library for building real-time, highly scalable, fault tolerant, distributed applications. The library is fully integrated with Kafka and leverages Kafka producer and consumer semantics (e.g: partitioning, rebalancing, data retention and compaction). What is really unique, the only dependency to run Kafka Streams application is a running Kafka cluster. Even local state stores are backed by Kafka topics to make the processing fault tolerant – brilliant!

Kafka Streams provides all necessary stream processing primitives like one-record-at-a-time processing, event time processing, windowing support and local state management. Application developer can choose from three different Kafka Streams APIs: DSL, Processor or KSQL.

  • Kafka Streams DSL (Domain Specific Language) – recommended way for most users because business logic can be expressed in a few lines of code. All stateless and stateful transformations are defined using declarative, functional programming style (filter, map, flatMap, reduce, aggregate operations). Kafka Stream DSL encapsulates most of the stream processing complexity but unfortunately it also hides many useful knobs and switches.

  • Kafka Processor API provides a low level, imperative way to define stream processing logic. At first sight Processor API could look hostile but finally gives much more flexibility to developer. With this blog post I would like to demonstrate that hand-crafted stream processors might be a magnitude more efficient than a naive implementation using Kafka DSL.

  • KSQL is a promise that stream processing could be expressed by anyone using SQL as the language. It offers an easy way to express stream processing transformations as an alternative to writing an application in a programming language such as Java. Moreover, processing transformation written in SQL like language can be highly optimized by execution engine without any developer effort. KSQL was released recently and it is still at very early development stage.

In the first part of this blog post I’ll define simple but still realistic business problem to solve. Then you will learn how to implement this use case with Kafka Stream DSL and how much the processing performance is affected by this naive solution. At this moment you could stop reading and scale-up Kafka cluster ten times to fulfill business requirements or you could continue reading and learn how to optimize the processing with low level Kafka Processor API.

Business Use Case

Let’s imagine a web based e-commerce platform with fabulous recommendation and advertisement systems. Every client during visit gets personalized recommendations and advertisements, the conversion is extraordinarily high and platform earns additional profits from advertisers. To build comprehensive recommendation models, such system needs to know everything about clients traits and their behaviour.

To make it possible, e-commerce platform reports all clients activities as an unbounded stream of page views and events. Every time the client enters web page, a so-called page view is sent to Kafka cluster. A page view contains web page attributes like request URI, referrer URI, user agent, active A/B experiments and many more. In addition to page view all important actions are reported as custom events, e.g: search, add to cart or checkout. To get a complete view of the activity stream, collected events need to be enriched with data from page views.

Data Model

Because most of the processing logic is built within context of given client, page views and events are evenly partitioned on Kafka topics by the client identifier.

1
2
3
4
5
type ClientId = String
case class ClientKey(clientId: ClientId)

val bob = ClientKey("bob")
val jim = ClientKey("jim")

Page view and event structures are different so messages are published to separate Kafka topics using ingestion time as the event time. Our system should not rely on page view or event creation time due to high client clocks variance. The topic key is always ClientKey and value is either Pv or Ev presented below. For better examples readability page view and event payload is defined as simplified single value field. Events are uniquely identified by pvId and evId pair, pvId could be a random identifier, evId a sequence number.

1
2
3
4
5
type PvId = String
type EvId = String

case class Pv(pvId: PvId, value: String)
case class Ev(evId: EvId, value: String, pvId: PvId)

Enriched results EvPv is published to output Kafka topic using ClientKey as message key. This topic is then consumed directly by advertisement and recommendation systems.

1
case class EvPv(evId: EvId, evValue: String, pvId: Option[PvId], pvValue: Option[String])

Example Scenario

For client “bob” the following page views and events are collected by the system.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Bob enters main page
ClientKey("bob"), Pv("pv0", "/")

// A few impression events collected almost immediately
ClientKey("bob"), Ev("ev0", "show header", "pv0")
ClientKey("bob"), Ev("ev1", "show ads", "pv0")
ClientKey("bob"), Ev("ev2", "show recommendation", "pv0")

// There is also single duplicated event, welcome to distributed world
ClientKey("bob"), Pv("ev1", "show ads", "pv0")

// A dozen seconds later Bob clicks on one of the offers presented on the main page
ClientKey("bob"), Pv("ev3", "click recommendation", "pv0")

// Out of order event collected before page view on the offer page 
ClientKey("bob"), Ev("ev0", "show header", "pv1")

// Offer page view
ClientKey("bob"), Pv("pv1", "/offer?id=1234")

// An impression event published almost immediately after page view
ClientKey("bob"), Ev("ev1", "show ads", "pv1")

// Late purchase event, Bob took short coffee break before the final decision
ClientKey("bob"), Ev("ev2", "add to cart", "pv1")

For above clickstream the following enriched events output stream is expected.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Events from main page without duplicates

ClientKey("bob"), EvPv("ev0", "show header", "pv0", "/")
ClientKey("bob"), EvPv("ev1", "show ads", "pv0", "/")
ClientKey("bob"), EvPv("ev2", "show recommendation", "pv0", "/")
ClientKey("bob"), EvPv("ev3", "click recommendation", "pv0", "/")

// Events from offer page, somehow incomplete due to streaming semantics limitations

// early event
ClientKey("bob"), EvPv("ev0", "show header", None, None)
ClientKey("bob"), EvPv("ev1", "show ads", "pv1", "/offer?id=1234")

// late event
ClientKey("bob"), EvPv("ev2", "add to cart", None, None)

Kafka Stream DSL

Now we are ready to implement above use case with recommended Kafka Streams DSL. The code could be optimized but I would like to present the canonical way of using DSL without exploring DSL internals. All examples are implemented using the latest Kafka Streams 1.0.0 version.

Create two input streams for page views and events connected to “clickstream.events” and “clickstream.page_views” Kafka topics.

1
2
3
4
val builder = new StreamsBuilder()

val evStream = builder.stream[ClientKey, Ev]("clickstream.events")
val pvStream = builder.stream[ClientKey, Pv]("clickstream.page_views")

Repartition topics by client and page view identifiers PvKey as a prerequisite to join events with page view. Method selectKey sets a new key for every input record, and marks derived stream for repartitioning.

1
2
3
4
5
6
7
8
9
10
11
case class PvKey(clientId: ClientId, pvId: PvId)

val evToPvKeyMapper: KeyValueMapper[ClientKey, Ev, PvKey] =
  (clientKey, ev) => PvKey(clientKey.clientId, ev.pvId)

val evByPvKeyStream = evStream.selectKey(evToPvKeyMapper)

val pvToPvKeyMapper: KeyValueMapper[ClientKey, Pv, PvKey] =
  (clientKey, pv) => PvKey(clientKey.clientId, pv.pvId)

val pvByPvKeyStream = pvStream.selectKey(pvToPvKeyMapper)

Join event with page view streams by selected previously PvKey, left join is used because we are interested also in events without matched page view. Every incoming event is enriched by matched page view into EvPv structure.

The join window duration is set to reasonable 10 minutes. It means, that Kafka Streams will look for messages in “event” and “page view” sides of the join 10 minutes in the past and 10 minutes in the future (using event time, not wall-clock time). Because we are not interested in late events out of defined window, the retention is 2 times longer than window, to hold events from the past and the future. If you are interested why 1 milliseconds needs to be added to the retention, please ask Kafka Streams authors not me ;)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val evPvJoiner: ValueJoiner[Ev, Pv, EvPv] = { (ev, pv) =>
  if (pv == null) {
    EvPv(ev.evId, ev.value, None, None)
  } else {
    EvPv(ev.evId, ev.value, Some(pv.pvId), Some(pv.value))
  }
}

val joinWindowDuration = 10 minutes

val joinRetention = joinWindowDuration.toMillis * 2 + 1
val joinWindow = JoinWindows.of(joinWindowDuration.toMillis).until(joinRetention)

val evPvStream = evByPvKeyStream.leftJoin(pvByPvKeyStream, evPvJoiner, joinWindow)

Now it’s time to fight with duplicated enriched events. Duplicates come from unreliable nature of the network between client browser and our system. Most real-time processing pipelines in advertising and recommendation systems are counting events, so duplicates in the enriched clickstream could cause inaccuracies.

The most straightforward deduplication method is to compare incoming event with state of previously processed events. If the event has been already processed it should be skipped.

Unfortunately DSL does not provide “deduplicate” method out-of-the-box but similar logic might be implemented with “reduce” operation.

First we need to define deduplication window. Deduplication window can be much shorter than join window, we do not expect duplicates more than 10 seconds between each other.

1
2
3
4
val deduplicationWindowDuration = 10 seconds

val deduplicationRetention = deduplicationWindowDuration.toMillis * 2 + 1
val deduplicationWindow = TimeWindows.of(deduplicationWindowDuration.toMillis).until(deduplicationRetention)

Joined stream needs to be repartitioned again by compound key EvPvKey composed of client, page view and event identifiers. This key will be used to decide if EvPv is a duplicate or not. Next, the stream is grouped by selected key into KGroupedStream and deduplicated with reduce function, where first observed event wins.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
case class EvPvKey(clientId: ClientId, pvId: PvId, evId: EvId)

val evPvToEvPvKeyMapper: KeyValueMapper[PvKey, EvPv, EvPvKey] =
  (pvKey, evPv) => EvPvKey(pvKey.clientId, pvKey.pvId, evPv.evId)

val evPvByEvPvKeyStream = evPvStream.selectKey(evPvToEvPvKeyMapper)

val evPvDeduplicator: Reducer[EvPv] =
  (evPv1, _) => evPv1


val deduplicatedStream = evPvByEvPvKeyStream
  .groupByKey()
  .reduce(evPvDeduplicator, deduplicationWindow, "evpv-store")
  .toStream()

This deduplication implementation is debatable, due to “continue stream” semantics of KTable/KStream. Reduce operation creates KTable, and this KTable is transformed again into KStream of continuous updates of the same key. It could lead to duplicates again if the update frequency is higher than inverse of deduplication window period. For 10 seconds deduplication window the updates should not be emitted more often than every 10 seconds but lower updates frequency leads to higher latency. The updates frequency is controlled globally using “cache.max.bytes.buffering” and “commit.interval.ms” Kafka Streams properties. See reference documentation for details: Record caches in the DSL.

I did not find another way to deduplicate events with DSL, please let me know if better implementation exists.

In the last stage the stream needs to be repartitioned again by client id and published to “clickstream.events_enriched” Kafka topic for downstream subscribers. In the same step mapper gets rid of the windowed key produced by windowed reduce function.

1
2
3
4
5
6
val evPvToClientKeyMapper: KeyValueMapper[Windowed[EvPvKey], EvPv, ClientId] =
  (windowedEvPvKey, _) => windowedEvPvKey.key.clientId

val finalStream = deduplicatedStream.selectKey(evPvToClientKeyMapper)

finalStream.to("clickstream.events_enriched")

Under The Hood

Kafka Stream DSL is quite descriptive, isn’t it? Especially developers with strong functional programming skills appreciate the overall design. But you will shortly see how much unexpected traffic to Kafka cluster is generated during runtime.

I like numbers so let’s estimate the traffic, based on real clickstream ingestion platform I develop on daily basis:

  • 1 kB – average page view size
  • 600 B – average event size
  • 4k – page views / second
  • 20k – events / second

It gives 24k msgs/s and 16MB/s traffic-in total, the traffic easily handled even by small Kafka cluster.

When stream of data is repartitioned Kafka Streams creates additional intermediate topic and publishes on the topic whole traffic partitioned by selected key. To be more precise it happens twice in our case, for repartitioned page views and events before join. We need to add 24k msgs/s and 16MB/s more traffic-in to the calculation.

When streams of data are joined using window, Kafka Streams sends both sides of the join to two intermediate topics again. Even if you don’t need fault tolerance, logging into Kafka cannot be disabled using DSL. You cannot also get rid of window for “this” side of the join (window for events), more about it later on. Add 24k msgs/s and 16MB/s more traffic-in to the calculation again.

To deduplicate events, joined stream goes again into Kafka Streams intermediate topic. Add 20k msgs/s and (1kB + 1.6kB) * 20k = 52MB/s more traffic-in to the calculation again.

The last repartitioning by client identifier adds 20k msgs/s and 52MB/s more traffic-in.

Finally, instead of 24k msgs/s and 16MB/s traffic-in we have got 112k msgs/s and 152MB traffic-in. And I did not even count traffic from internal topics replication and standby replicas recommended for resiliency.

Be aware that this is calculation for simple join of events and pages views generated by local e-commerce platform in central Europe country (~20M clients). I could also easily imagine much more complex stream topology, with tens of repartitions, joins and aggregations.

If you are not careful, your Kafka Streams application could easily kill your Kafka cluster. At least our application did it once. Application deployed on 10 Mesos nodes (4CPU, 4GB RAM) almost killed Kafka cluster deployed also on 10 physical machines (32CPU, 64GB RAM, SSD). Application was started after some time of inactivity and processed 3 hours of retention in 5 minutes (yep, it’s a well known vulnerability until KIP-13 is open).

Kafka Processor API

Now it’s time to check Processor API and figure out how to optimize our stream topology.

Create the sources from input topics “clickstream.events” and “clickstream.page_views”.

1
2
3
new Topology()
  .addSource("ev-source", "clickstream.events")
  .addSource("pv-source", "clickstream.page_views")

Because we need to join an incoming event with the collected page view in the past, create processor which stores page view in windowed store. The processor puts observed page views into window store for joining in the next processor. The processed page views do not even need to be forwarded to downstream.

1
2
3
4
5
6
7
8
class PvWindowProcessor(val pvStoreName: String) extends AbstractProcessor[ClientKey, Pv] {

  private lazy val pvStore =
    context().getStateStore(pvStoreName).asInstanceOf[WindowStore[ClientKey, Pv]]

  override def process(key: ClientKey, value: Pv): Unit =
    pvStore.put(key, value)
}

Store for page views is configured with the same size of window and retention. This store is configured to keep duplicates due to the fact that the key is a client id not page view id (retainDuplicates parameter). Because join window is typically quite long (minutes) the store should be fault tolerant (logging enabled). Even if one of the stream instances fails, another one will continue processing with persistent window state built by failed node, cool! Finally, the internal kafka topic can be easily configured using loggingConfig map (replication factor, number of partitions, etc.).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val pvStoreWindowDuration = 10 minutes

val retention = pvStoreWindowDuration.toMillis
val window = pvStoreWindowDuration.toMillis
val segments = 3
val retainDuplicates = true

val loggingConfig = Map[String, String]()

val pvWindowStore = Stores.windowStoreBuilder(
  Stores.persistentWindowStore("pv-window-store", retention, segments, window, retainDuplicates),
  ClientKeySerde,
  PvSerde
).withLoggingEnabled(loggingConfig)

The first optimization you could observe is that in our scenario only one window store is created – for page views. The window store for events is not needed, if page view is collected by system after event it does not trigger new join.

Add page view processor to the topology and connect with page view source upstream.

1
2
3
4
5
6
val pvWindowProcessor: ProcessorSupplier[ClientKey, Pv] =
  () => new PvWindowProcessor("pv-window-store")

new Topology()
  (...)
  .addProcessor("pv-window-processor", pvWindowProcessor, "pv-source")

Now, it’s time for event and page view join processor, heart of the topology. It seems to be complex but this processor also deduplicates joined stream using evPvStore.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class EvJoinProcessor(
  val pvStoreName: String,
  val evPvStoreName: String,
  val joinWindow: FiniteDuration,
  val deduplicationWindow: FiniteDuration
) extends AbstractProcessor[ClientKey, Ev] {

  import scala.collection.JavaConverters._

  private lazy val pvStore =
    context().getStateStore(pvStoreName).asInstanceOf[WindowStore[ClientKey, Pv]]

  private lazy val evPvStore =
    context().getStateStore(evPvStoreName).asInstanceOf[WindowStore[EvPvKey, EvPv]]

  override def process(key: ClientKey, ev: Ev): Unit = {
    val timestamp = context().timestamp()
    val evPvKey = EvPvKey(key.clientId, ev.pvId, ev.evId)

    if (isNotDuplicate(evPvKey, timestamp, deduplicationWindow)) {
      val evPv = storedPvs(key, timestamp, joinWindow)
        .find { pv =>
          pv.pvId == ev.pvId
        }
        .map { pv =>
          EvPv(ev.evId, ev.value, Some(pv.pvId), Some(pv.value))
        }
        .getOrElse {
          EvPv(ev.evId, ev.value, None, None)
        }

      context().forward(evPvKey, evPv)
      evPvStore.put(evPvKey, evPv)
    }
  }

  private def isNotDuplicate(evPvKey: EvPvKey, timestamp: Long, deduplicationWindow: FiniteDuration) =
    evPvStore.fetch(evPvKey, timestamp - deduplicationWindow.toMillis, timestamp).asScala.isEmpty

  private def storedPvs(key: ClientKey, timestamp: Long, joinWindow: FiniteDuration) =
    pvStore.fetch(key, timestamp - joinWindow.toMillis, timestamp).asScala.map(_.value)
  }

First processor performs a lookup for previously joined PvEv by PvEvKey. If PvEv is found the processing is skipped because EvPv has been already processed.

Next, try to match page view to event using simple filter pv.pvId == ev.pvId. We don’t need any repartitioning to do that, only get all page views from given client and join with event in the processor itself. It should be very efficient because every client generates up do hundred page views in 10 minutes. If there is no matched page view in the configured window, EvPv without page view details is forwarded to the downstream.

Perceptive reader noticed that processor also changes the key from ClientId to EvPvKey for deduplication purposes. Everything is still within given client context without the need for any repartitioning. This is possible due to the fact, that new key is more detailed than the original one.

As before, windowed store for deduplication needs to be configured. Because deduplication is done in a very short window (10 seconds or so), the logging to backed internal Kafka topic is disabled at all. If one of the stream instance fails, we could get some duplicates during this short window, not a big deal.

1
2
3
4
5
6
7
8
9
10
11
12
val evPvStoreWindowDuration = 10 seconds

val retention = evPvStoreWindowDuration.toMillis
val window = evPvStoreWindowDuration.toMillis
val segments = 3
val retainDuplicates = false

val evPvStore = Stores.windowStoreBuilder(
  Stores.persistentWindowStore("ev-pv-window-store", retention, segments, window, retainDuplicates),
  EvPvKeySerde,
  EvPvSerde
)

Add join processor to the topology and connect with event source upstream.

1
2
3
4
5
6
val evJoinProcessor: ProcessorSupplier[ClientKey, Ev] =
  () => new EvJoinProcessor("pv-window-store", "ev-pv-window-store", pvStoreWindowDuration, evPvStoreWindowDuration)

new Topology()
  (...)
  .addProcessor("ev-join-processor", evJoinProcessor, "ev-source")

The last processor maps compound key EvPvKey again into ClientId. Because client identifier is already a part of the compound key, mapping is done by the processor without the need for further repartitioning.

1
2
3
4
class EvPvMapProcessor extends AbstractProcessor[EvPvKey, EvPv] {
  override def process(key: EvPvKey, value: EvPv): Unit =
    context().forward(ClientKey(key.clientId), value)
}

Add the map processor to the topology.

1
2
3
4
5
6
val evPvMapProcessor: ProcessorSupplier[EvPvKey, EvPv] =
  () => new EvPvMapProcessor()

new Topology()
  (...)
  .addProcessor("ev-pv-map-processor", evPvMapProcessor, "ev-pv-join-processor")

Finally publish join results to “clickstream.events_enriched” Kafka topic.

1
2
3
new Topology()
  (...)
  .addSink("ev-pv-sink", EvPvTopic, "clickstream.events_enriched")

If a processor requires access to the store this fact must be registered. It would be nice to have statically typed Topology API for registration, but now if the store is not connected to the processor, or is connected to the wrong store, runtime exception is thrown during application startup.

1
2
3
4
new Topology()
  (...)
  .addStateStore(pvStore, "pv-window-processor", "ev-join-processor")
  .addStateStore(evPvStore, "ev-join-processor")

Let’s count Kafka Streams internal topics overhead for Processor API version. Wait, there is only one internal topic, for page view join window! It gives 4k messages per second and 4MB traffic-in overhead, not more.

28k instead of 112k messages per second and 20MB instead of 152MB traffic-in in total. It is a noticeable difference between Processor API and DSL topology versions, especially if we keep in mind that enrichment results are almost identical to results from DSL version.

Summary

Dear readers, are you still with me after long lecture with not so easy to digest Scala code? I hope so :)

My final thoughts about Kafka Streams:

  • Kafka DSL looks great at first, functional and declarative API sells the product, no doubts.
  • Unfortunately Kafka DSL hides a lot of internals which should be exposed via the API (stores configuration, join semantics, repartitioning) – see KIP-182.
  • Processor API seems to be more complex and less sexy than DSL.
  • But Processor API allows you to create hand-crafted, very efficient stream topologies.
  • I did not present any Kafka Streams test (what’s the shame – I’m sorry) but I think testing would be easier with Processor API than DSL. With DSL it has to be an integration test, processors can be easily unit tested in separation with a few mocks.
  • As Scala developer I prefer Processor API than DSL, e.g. Scala compiler could not infer KStream generic types.
  • It’s a pleasure to work with processor and fluent Topology APIs.
  • I’m really keen on KSQL future, it would be great to get optimized engine like Spark Catalyst eventually.
  • Finally, Kafka Streams library is extraordinarily fast and hardware efficient, if you know what you are doing.

As always working code is published on https://github.com/mkuthan/example-kafkastreams. The project is configured with Embedded Kafka and does not require any additional setup. Just uncomment either DSL or Processor API version, run main class and observe enriched stream of events on the console.

Enjoy!

Apache BigData Europe Conference Summary

Last week I attended Apache Big Data Europe held in Sevilla, Spain. The event concentrates around big data projects under Apache Foundation umbrella. Below you can find my overall impression on the conference and notes from several interesting sessions. The notes are presented as a short checklists, if some aspect was particularly interesting I put the reference to supplementary materials.

Key takeaways

  • At Allegro we are on track with our clickstream ingestion platform. Apache Kafka, Apache Avro, Apache Parquet, Apache Spark, Apache Hive and last but not least Apache Druid are key players for us, all hosted under Apache Foundation!
  • Apache Ignite might solve many performance issues in Spark jobs (shared RDD) but also in MR jobs (in-memory MR, HDFS cache). Has to be verified during next Allegro internal hackaton, for sure.
  • Integration between Apache Hive and Apache Druid looks really promising. Both tools are very important for Hadoop ecosystem and they complement each other quite well.
  • Apache Calcite seems to be important element in the Hadoop ecosystem. I hope that the gap to mature RDBMS optimizers will be somehow filled. It would be also great to see Spark Catalyst and Apache Calcite cooperation, keep your fingers crossed.
  • Stephan Even should improve Apache Flink keynotes if DataArtisans want to compete with DataBricks. Apache Flink architecture and overall design is awesome, FTW.
  • Queryable state in stream processing is quite interesting idea to decrease latency in access to pre-aggregated data.
  • Apache Kudu and Apache Impala are on dead end, IMHO. The concept to execute analytical queries (fast SQL on Impala) against whole set of raw data (kept in Kudu) is unrealistic. Cloudera gains mastery in keeping their own technologies alive (e.g: Apache Flume).
  • Apache Gearpump from Intel has lost its momentum. I really liked idea of distributed streaming framework built on Akka.
  • I was really surprised that CSV format is heavily used and what is even worse, conference speakers still talk about it.
  • Unfortunately there was no session about Apache Kylin, so sad.

TL;DR

“Stream processing as a foundational paradigm and Apache Flink’s approach to it” by Stephan Ewen (Data Artisans)

“Apache Gearpump next-gen streaming engine” by Karol Brejna, Huafeng Wang (Intel)

“An overview on optimization in Apache Hive: past, present, future” by Jesus Rodriguez (HortonWorks)

  • Goals – sub-seconds latency, petabyte scale, ANSI SQL – never ending story.
  • Metastore is often a bottleneck (DataNucleus ORM). See also: https://cwiki.apache.org/confluence/display/Hive/Design#Design-MetastoreArchitecture
  • Metastore on HBase (Hive 2.x, alpha).
  • Cost Based Optimizer (Apache Calcite), integrated from 0.14, enabled by default from 2.0.
  • Optimizations: push down projections, push down filtering, join reordering (bushy joins), propagate projections, propagate filtering and more.
  • New feature: materialized views. Cons: up-to-date statistics, optimizer could use view instead of table. See more: HIVE-10459
  • Roadmap: optimizations based on CPU/MEM/IO costs.

“Distributed in-database machine learning with Apache MADlib” by Roman Shaposhnik (Pivotal)

“Interactive analytics at scale in Hive using Druid” by Jesus Rodriguez (HortonWorks)

  • It works, at least on Jesus laptop with unreleased Hive version.
  • Druid is used as a library (more or less).
  • Druid indexing service is not used for indexing (regular Hive MR is used instead).
  • Druid broker is used for querying but there are plans to bypass broker and access historical/realtime/indexing nodes directly. Right now the broker might be a bottleneck.
  • Apache Calcite is a key player in the integration.
  • Pros: schema discovery.
  • Cons: dims/metrics are inferred, right now there is no way to specify all important index details (e.g: time granularities).
  • Roadmap: push down more things into the Druid for better query performance. See more: https://cwiki.apache.org/confluence/display/Hive/Druid+Integration

“Hadoop at Uber” by Mayank Basal (Uber)

  • Fast pace of changes, respect!
  • Shared cluster for batch jobs (YARN) and realtime jobs (Mesos) –> better resources utilization.
  • Apache Myriad (YARN on Mesos): static allocation, no global quotas, no global priorities and many more limitations and problems.
  • Unified Scheduler – just the name without any details yet.

“Spark Performance” by Tim Ellison (IBM)

Apache Calcite and Apache Geode by Christian Tzolov (Pivotal)

  • Apache Geode (AKA Gemfire) – distributed hashmap, consistent, transactional, partitioned, replicated, etc.
  • PDX serialization, on field level, type registry.
  • Nested regions.
  • Embeddable.
  • Object Query Language (OQL) ~ SQL.
  • Apache Calcite adapter (work in progress). The adapter might be implemented gradually (from in-memory enumerable to advanced pushdowns/optimizations and bindable generated code).
  • Linq4j ported from .NET.

“Data processing pipeline at Trivago” by Clemens Valiente (Trivago)

  • Separated datacenters, REST collectors with HDD fallback, Apache Kafka, Camus, CSV.
  • Hive MR jobs prepare aggregates and data subsets for Apache Impala, Apache Oozie used as scheduler.
  • Problems with memory leaks in Apache Impala.
  • R/Shiny connected to Impala for analytical purposes.
  • Roadmap: Kafka Streams, Impala + Kudu, Kylin + HBase.
  • Interesting concept: direct access to Kafka Streams state (queryable RocksDB).

“Implementing BigPetStore in Spark and Flink” by Marton Balasi (Cloudera)

  • BigTop – way to build packages or setup big data servers and tools locally.
  • BigPetStore – generates synthethic data + sample stats calculation + sample recommendation (collaborative filtering).
  • MR, Spark, Flink implementations – nice method to learn Flink if you already know Spark.

“Introduction to TensorFlow” by Gemma Parreno

  • Global finalist of NASA Space App Challenge, congrats!
  • Extremely interesting session, but I’ve been totally lost – too much science :–(

“Shared Memory Layer and Faster SQL for Spark Applications” by Dmitriy Setrakyan (GridGain)

“Apache CouchDB” by Jan Lehnardt (Neighbourhoodie Software)

  • Master-master key-value store.
  • HTTP as protocol.
  • JSON as storage format.
  • Simplified vector clock for conflicts handling (document hashes, consistent across cluster).
  • Entertaining part about handling time in distributed systems, highly recommended.
  • Conflict-Free Replicated JSON Datatype was mentioned by Jan during talk, interesting.
  • Roadmap: HTTP2, pluggable storage format, improved protocol for high latency networks.

“Java memory leaks in modular environment” by Mark Thomas (Pivotal)

  • Did you remember “OutOfMemoryError: PermGen space” in Tomcat? It is mostly not Tomcat fault.
  • You should always set -XX:MaxMetaspaceSize in production systems.
  • Excellent memory leaks analysis live demo using YourKit profiler (leaks in awt, java2d, rmi, xml). https://github.com/markt-asf/memory-leaks

“Children and the art of coding” by Sebastien Blanc (RedHat)

  • The best, entertaining session on the conference, IMHO! If you are happy parent, you should watch Sebastien’s session (unfortunately sessions were not recorded, AFAIK).
  • Logo, Scratch, Groovy, Arduino and Makey Makey

Long-running Spark Streaming Jobs on YARN Cluster

A long-running Spark Streaming job, once submitted to the YARN cluster should run forever until it is intentionally stopped. Any interruption introduces substantial processing delays and could lead to data loss or duplicates. Neither YARN nor Apache Spark have been designed for executing long-running services. But they have been successfully adapted to growing needs of near real-time processing implemented as long-running jobs. Successfully does not necessarily mean without technological challenges.

This blog post summarizes my experiences in running mission critical, long-running Spark Streaming jobs on a secured YARN cluster. You will learn how to submit Spark Streaming application to a YARN cluster to avoid sleepless nights during on-call hours.

Fault tolerance

In the YARN cluster mode Spark driver runs in the same container as the Application Master, the first YARN container allocated by the application. This process is responsible for driving the application and requesting resources (Spark executors) from YARN. What is important, Application Master eliminates need for any another process that run during application lifecycle. Even if an edge Hadoop cluster node where the Spark Streaming job was submitted fails, the application stays unaffected.

To run Spark Streaming application in the cluster mode, ensure that the following parameters are given to spark-submit command:

1
spark-submit --master yarn --deploy-mode cluster

Because Spark driver and Application Master share a single JVM, any error in Spark driver stops our long-running job. Fortunately it is possible to configure maximum number of attempts that will be made to re-run the application. It is reasonable to set higher value than default 2 (derived from YARN cluster property yarn.resourcemanager.am.max-attempts). For me 4 works quite well, higher value may cause unnecessary restarts even if the reason of the failure is permanent.

1
2
spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4

If the application runs for days or weeks without restart or redeployment on highly utilized cluster, 4 attempts could be exhausted in few hours. To avoid this situation, the attempt counter should be reset on every hour of so.

1
2
3
spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h

Another important setting is a maximum number of executor failures before the application fails. By default it is max(2 * num executors, 3), well suited for batch jobs but not for long-running jobs. The property comes with corresponding validity interval which also should be set.

1
2
3
4
5
spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h

For long-running jobs you could also consider to boost maximum number of task failures before giving up the job. By default tasks will be retried 4 times and then job fails.

1
2
3
4
5
6
spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8

Performance

When a Spark Streaming application is submitted to the cluster, YARN queue where the job runs must be defined. I strongly recommend using YARN Capacity Scheduler and submitting long-running jobs to separate queue. Without a separate YARN queue your long-running job will be preempted by a massive Hive query sooner or later.

1
2
3
4
5
6
7
spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    --queue realtime_queue

Another important issue for Spark Streaming job is keeping processing time stable and highly predictable. Processing time should stay below batch duration to avoid delays. I’ve found that Spark speculative execution helps a lot, especially on a busy cluster. Batch processing times are much more stable when speculative execution is enabled. Unfortunately speculative mode can be enabled only if Spark actions are idempotent.

1
2
3
4
5
6
7
8
spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    --queue realtime_queue \
    --conf spark.speculation=true

Security

On a secured HDFS cluster, long-running Spark Streaming jobs fails due to Kerberos ticket expiration. Without additional settings, Kerberos ticket is issued when Spark Streaming job is submitted to the cluster. When ticket expires Spark Streaming job is not able to write or read data from HDFS anymore.

In theory (based on documentation) it should be enough to pass Kerberos principal and keytab as spark-submit command:

1
2
3
4
5
6
7
8
9
10
spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab

In practice, due to several bugs (HDFS-9276, SPARK-11182) HDFS cache must be disabled. If not, Spark will not be able to read updated token from file on HDFS.

1
2
3
4
5
6
7
8
9
10
11
spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab \
     --conf spark.hadoop.fs.hdfs.impl.disable.cache=true

Mark Grover pointed out that those bugs only affect HDFS cluster configured with NameNodes in HA mode. Thanks, Mark.

Logging

The easiest way to access Spark application logs is to configure Log4j console appender, wait for application termination and use yarn logs -applicationId [applicationId] command. Unfortunately it is not feasible to terminate long-running Spark Streaming jobs to access the logs.

I recommend to install and configure Elastic, Logstash and Kibana (ELK stack). ELK installation and configuration is out of this blog post scope, but remember to log the following context fields:

  • YARN application id
  • YARN container hostname
  • Executor id (Spark driver is always 000001, Spark executors start from 000002)
  • YARN attempt (to check how many times Spark driver has been restarted)

Log4j configuration with Logstash specific appender and layout definition should be passed to spark-submit command:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab \
     --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
     --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --files /path/to/log4j.properties

Finally Kibana dashboard for Spark Job might look like:

Monitoring

Long running job runs 24/7 so it is important to have an insight into historical metrics. Spark UI keeps statistics only for limited number of batches, and after restart all metrics are gone. Again, external tools are needed. I recommend to install Graphite for collecting metrics and Grafana for building dashboards.

First, Spark needs to be configured to report metrics into Graphite, prepare the metrics.properties file:

1
2
3
4
5
6
7
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=[hostname]
*.sink.graphite.port=[port]
*.sink.graphite.prefix=some_meaningful_name

driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

And configure spark-submit command:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab \
     --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
     --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --files /path/to/log4j.properties:/path/to/metrics.properties

Metrics

Spark publishes tons of metrics from driver and executors. If I were to choose the most important one, it would be the last received batch records. When StreamingMetrics.streaming.lastReceivedBatch_records == 0 it probably means that Spark Streaming job has been stopped or failed.

Other important metrics are listed below:

  • When total delay is greater than batch interval, latency of the processing pipeline increases.
1
driver.StreamingMetrics.streaming.lastCompletedBatch_totalDelay
  • When number of active tasks is lower than number of executors * number of cores, allocated YARN resources are not fully utilized.
1
executor.threadpool.activeTasks
  • How much RAM is used for RDD cache.
1
driver.BlockManager.memory.memUsed_MB
  • When there is not enough RAM for RDD cache, how much data has been spilled to disk. You should increase executor memory or change spark.memory.fraction Spark property to avoid performance degradation.
1
driver.BlockManager.disk.diskSpaceUsed_MB
  • What is JVM memory utilization on Spark driver.
1
2
3
4
5
driver.jvm.heap.used
driver.jvm.non-heap.used
driver.jvm.pools.G1-Old-Gen.used
driver.jvm.pools.G1-Eden-Space.used
driver.jvm.pools.G1-Survivor-Space.used
  • How much time is spent on GC on Spark driver.
1
2
driver.jvm.G1-Old-Generation.time
driver.jvm.G1-Young-Generation.time
  • What is JMV memory utilization on Spark executors.
1
2
3
4
5
[0-9]*.jvm.heap.used
[0-9]*.jvm.non-heap.used
[0-9]*.jvm.pools.G1-Old-Gen.used
[0-9]*.jvm.pools.G1-Survivor-Space.used
[0-9]*.jvm.pools.G1-Eden-Space.used
  • How much time is spent on GC on Spark executors.
1
2
[0-9]*.jvm.G1-Old-Generation.time
[0-9]*.jvm.G1-Young-Generation.time

Grafana

While you configure first Grafana dashboard for Spark application, the first problem pops up:

How to configure Graphite query when metrics for every Spark application run are reported under its own application id?

If you are lucky and brave enough to use Spark 2.1, pin the application metric into static application name:

1
--conf spark.metrics.namespace=my_application_name

For Spark versions older than 2.1, a few tricks with Graphite built-in functions are needed.

Driver metrics use wildcard .*(application_[0-9]+).* and aliasSub Graphite function to present ‘application id’ as graph legend:

1
aliasSub(stats.analytics.$job_name.*.prod.$dc.*.driver.jvm.heap.used, ".*(application_[0-9]+).*", "heap: \1")

For executor metrics again use wildcard .*(application_[0-9]+).*, groupByNode Graphite function to sum metrics from all Spark executors and finally aliasSub Graphite function to present ‘application id’ as graph legend:

1
aliasSub(groupByNode(stats.analytics.$job_name.*.prod.$dc.*.[0-9]*.jvm.heap.used, 6, "sumSeries"), "(.*)", "heap: \1")

Finally Grafana dashboard for Spark Job might look like:

If Spark application is restarted frequently, metrics for old, already finished runs should be deleted from Graphite. Because Graphite does not compact inactive metrics, old metrics slow down Graphite itself and Grafana queries.

Graceful stop

The last puzzle element is how to stop Spark Streaming application deployed on YARN in a graceful way. The standard method for stopping (or rather killing) YARN application is using a command yarn application -kill [applicationId]. And this command stops the Spark Streaming application but this could happen in the middle of a batch. So if the job reads data from Kafka, saves processing results on HDFS and finally commits Kafka offsets you should expect duplicated data on HDFS when job was stopped just before committing offsets.

The first attempt to solve graceful shutdown issue was to call Spark streaming context stop method in a shutdown hook.

1
2
3
sys.addShutdownHook {
    streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}

Disappointingly a shutdown hook is called too late to finish started batch and Spark application is killed almost immediately. Moreover there is no guarantee that a shutdown hook will be called by JVM at all.

At the time of writing this blog post the only confirmed way to shutdown gracefully Spark Streaming application on YARN is to notifying somehow the application about planned shutdown, and then stop streaming context programmatically (but not from shutdown hook). Command yarn application -kill should be used only as a last resort if notified application did not stop after defined timeout.

The application can be notified about planned shutdown using marker file on HDFS (the easiest way), or using simple Socket/HTTP endpoint exposed on the driver (sophisticated way).

Because I like KISS principle, below you can find shell script pseudo-code for starting / stopping Spark Streaming application using marker file:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
start() {
    hdfs dfs -touchz /path/to/marker/my_job_unique_name
    spark-submit ...
}

stop() {
    hdfs dfs -rm /path/to/marker/my_job_unique_name
    force_kill=true
    application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
    for i in `seq 1 10`; do
        application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
        if [ -n "$application_status" ]; then
            sleep 60s
        else
            force_kill=false
            break
        fi
    done
    $force_kill && yarn application -kill ${application_id}
}

In the Spark Streaming application, background thread should monitor marker file, and when the file disappears stop the context calling streamingContext.stop(stopSparkContext = true, stopGracefully = true).

Summary

As you could see, configuration for mission critical Spark Streaming application deployed on YARN is quite complex. It has been long, tedious and iterative learning process of all presented techniques by a few very smart devs. But at the end, long-running Spark Streaming applications deployed on highly utilized YARN cluster are extraordinarily stable.

Spark Application Assembly for Cluster Deployments

When I tried to deploy my first Spark application on a YARN cluster, I realized that there was no clear and concise instruction how to prepare the application for deployment. This blog post could be treated as missing manual on how to build Spark application written in Scala to get deployable binary.

This blog post assumes that your Spark application is built with SBT. As long as SBT is a mainstream tool for building Scala applications the assumption seems legit. Please ensure that your project is configured with at least SBT 0.13.6. Open project/build.properties file, verify the version and update SBT if needed:

1
sbt.version=0.13.11

SBT Assembly Plugin

The spark-submit script is a convenient way to launch Spark application on the YARN or Mesos cluster. However, due to distributed nature of the cluster the application has to be prepared as single Java ARchive (JAR). This archive includes all classes from your project with all of its dependencies. This application assembly can be prepared using SBT Assembly Plugin.

To enable SBT Assembly Plugin, add the plugin dependency to the project/plugins.sbt file:

1
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

This basic setup can be verified by calling sbt assembly command. The final assembly location depend on the Scala version, application name and application version. The build result could be assembled into target/scala-2.11/myapp-assembly-1.0.jar file.

You can configure many aspects of SBT Assembly Plugin like custom merge strategy but I found that it is much easier to keep the defaults and follow the plugin conventions. And what is even more important you don’t have to change defaults to get correct, deployable application binary assembled by the plugin.

Provided dependencies scope

As long as cluster provides Spark classes at runtime, Spark dependencies must be excluded from the assembled JAR. If not, you should expect weird errors from Java classloader during application startup. Additional benefit of assembly without Spark dependencies is faster deployment. Please remember that application assembly must be copied over the network to the location accessible by all cluster nodes (e.g: HDFS or S3).

Look at dependency section in your build file, it should look similar to the code snippet below:

1
2
3
4
5
6
7
8
9
10
val sparkVersion = "1.6.0"

"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion,
"org.apache.spark" %% "spark-mlib" % sparkVersion,
"org.apache.spark" %% "spark-graphx" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion,
(...)

The list of the Spark dependencies is always project specific. SQL, Hive, MLib, GraphX and Streaming extensions are defined only for reference. All defined dependencies are required by local build to compile code and run tests. So they could not be removed from the build definition in the ordinary way because it will break the build at all.

SBT Assembly Plugin comes with additional dependency scope “provided”. The scope is very similar to Maven provided scope. The provided dependency will be part of compilation and test, but excluded from the application assembly.

To configure provided scope for Spark dependencies change the definition as follows:

1
2
3
4
5
6
7
8
9
10
11
12
val sparkVersion = "1.6.0"

"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"org.apache.spark" %% "spark-mlib" % sparkVersion % "provided",
"org.apache.spark" %% "spark-graphx" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion
  exclude("log4j", "log4j")
  exclude("org.spark-project.spark", "unused"),
(...)

Careful readers should notice that “spark-streaming-kafka” dependency has not been listed and marked as “provided”. It was done by purpose because integration with Kafka is not part of Spark distribution assembly and has to be assembled into application JAR. The exclusion rules for “spark-streaming-kafka” dependency will be discussed later.

Ok, but how to recognize which libraries are part of Spark distribution assembly? There is no simple answer to this question. Look for spark-assembly-*-1.6.0.jar file on the cluster classpath, list the assembly content and verify what is included and what is not. In the assembly on my cluster I found core, sql, hive, mlib, graphx and streaming classes are embedded but not integration with Kafka.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ tar -tzf spark-assembly-1.6.0.jar
META-INF/
META-INF/MANIFEST.MF
org/
org/apache/
org/apache/spark/
org/apache/spark/HeartbeatReceiver
(...)
org/apache/spark/ml/
org/apache/spark/ml/Pipeline$SharedReadWrite$$anonfun$2.class
org/apache/spark/ml/tuning/
(...)
org/apache/spark/sql/
org/apache/spark/sql/UDFRegistration$$anonfun$3.class
org/apache/spark/sql/SQLContext$$anonfun$range$2.class
(...)
reference.conf
META-INF/NOTICE

SBT run and run-main

Provided dependency scope unfortunately breaks SBT run and run-main tasks. Because provided dependencies are excluded from the runtime classpath, you should expect ClassNotFoundException during application startup on local machine. To fix this issue, provided dependencies must be explicitly added to all SBT tasks used for local run, e.g.:

1
2
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run))
runMain in Compile <<= Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run))

How to exclude Log4j from application assembly?

Without Spark classes the application assembly is quite lightweight. But the assembly size might be reduced event more!

Let assume that your application requires some logging provider. As long as Spark internally uses Log4j, it means that Log4j is already on the cluster classpath. But you may say that there is much better API for Scala than origin Log4j – and you are totally right.

The snippet below configure excellent Typesafe (Lightbend nowadays) Scala Logging Library dependency.

1
2
3
4
5
6
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",

"org.slf4j" % "slf4j-api" % "1.7.10",
"org.slf4j" % "slf4j-log4j12" % "1.7.10" exclude("log4j", "log4j"),

"log4j" % "log4j" % "1.2.17" % "provided",

Scala Logging is a thin wrapper for SLF4J implemented using Scala macros. The “slf4j-log4j12” is a binding library between SLF4J API and Log4j logger provider. Three layers of indirection but who cares :–)

There is also top-level dependency to Log4J defined with provided scope. But this is not enough to get rid of Log4j classes from the application assembly. Because Log4j is also a transitive dependency of “slf4j-log4j12” it must be explicitly excluded. If not, SBT Assembly Plugin adds Log4j classes to the assembly even if top level “log4j” dependency is marked as “provided”. Not very intuitive but SBT Assembly Plugin works this way.

Alternatively you could disable transitive dependencies for “slf4j-log4j12” at all. It could be especially useful for libraries with many transitive dependencies which are expected to be on the cluster classpath.

1
"org.slf4j" % "slf4j-log4j12" % "1.7.10" intransitive()

Spark Streaming Kafka dependency

Now we are ready to define dependency to “spark-streaming-kafka”. Because Spark integration with Kafka typically is not a part of Spark assembly, it must be embedded into application assembly. The artifact should not be defined within “provided” scope.

1
2
3
4
5
6
7
val sparkVersion = "1.6.0"

(...)
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion
  exclude("log4j", "log4j")
  exclude("org.spark-project.spark", "unused"),
(...)

Again, “log4j” transitive dependency of Kafka needs to be explicitly excluded. I also found that marker class from weird Spark “unused” artifact breaks default SBT Assembly Plugin merge strategy. It is much easier to exclude this dependency than customize merge strategy of the plugin.

Where is Guava?

When you look at your project dependencies you could easily find Guava (version 14.0.1 for Spark 1.6.0). Ok, Guava is an excellent library so you decide to use the library in your application.

WRONG!

Guava is on the classpath during compilation and tests but at runtime you will get “ClassNotFoundException” or method not found error. First, Guava is shaded in Spark distribution assembly under org/spark-project/guava package and should not be used directly. Second, there is a huge chance for outdated Guava library on the cluster classpath. In CDH 5.3 distribution, the installed Guava version is 11.0.2 released on Feb 22, 2012 – more than 4 years ago! Since the Guava is binary compatible only between 2 or 3 latest major releases it is a real blocker.

There are experimental configuration flags for Spark spark.driver.userClassPathFirst and spark.executor.userClassPathFirst. In theory it gives user-added jars precedence over Spark’s own jars when loading classes in the the driver. But in practice it does not work, at least for me :–(.

In general you should avoid external dependencies at all cost when you develop application deployed on the YARN cluster. Classloader hell is even bigger than in JEE containers like JBoss or WebLogic. Look for the libraries with minimal transitive dependencies and narrowed features. For example, if you need a cache, choose Caffeine over Guava.

Deployment optimization for YARN cluster

When application is deployed on YARN cluster using spark-submit script, the script upload Spark distribution assembly to the cluster during every deployment. The distribution assembly size is over 100MB, ten times more than typical application assembly!

So I really recommend to install Spark distribution assembly on well known location on the cluster and define spark.yarn.jar property for spark-submit. The assembly will not be copied over the network during every deployment.

1
spark.yarn.jar=hdfs:///apps/spark/assembly/spark-assembly-1.6.0.jar

Summary

I witnessed a few Spark projects where build.sbt were more complex than application itself. And application assembly was bloated with unnecessary 3rd party classes and deployment process took ages. Build configuration described in this blog post should help you deploy Spark application on the cluster smoothly and still keep SBT configuration easy to maintain.

Spark and Kafka Integration Patterns, Part 2

In the world beyond batch, streaming data processing is a future of dig data. Despite of the streaming framework using for data processing, tight integration with replayable data source like Apache Kafka is often required. The streaming applications often use Apache Kafka as a data source, or as a destination for processing results.

Apache Spark distribution has built-in support for reading from Kafka, but surprisingly does not offer any integration for sending processing result back to Kafka. This blog post aims to fill this gap in the Spark ecosystem.

In the first part of the series you learned how to manage Kafka producer using Scala lazy evaluation feature and how to reuse single Kafka producer instance on Spark executor.

In this blog post you will learn how to publish stream processing results to Apache Kafka in reliable way. First you will learn how Kafka Producer is working, how to configure Kafka producer and how to setup Kafka cluster to achieve desired reliability. In the second part of the blog post, I will present how to implement convenient library for sending continuous sequence of RDDs (DStream) to Apache Kafka topic, as easy as in the code snippet below.

1
2
3
4
5
// enable implicit conversions
import KafkaDStreamSink._

// send dstream to Kafka
dstream.sendToKafka(kafkaProducerConfig, topic)

Quick introduction to Kafka

Kafka is a distributed, partitioned, replicated message broker. Basic architecture knowledge is a prerequisite to understand Spark and Kafka integration challenges. You can safely skip this section, if you are already familiar with Kafka concepts.

For convenience I copied essential terminology definitions directly from Kafka documentation:

  • Kafka maintains feeds of messages in categories called topics.
  • We’ll call processes that publish messages to a Kafka topic producers.
  • We’ll call processes that subscribe to topics and process the feed of published messages consumers.
  • Kafka is run as a cluster comprised of one or more servers each of which is called a broker.

So, at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:

This is a bare minimum you have to know but I really encourage you to read Kafka reference manual thoroughly.

Kafka producer API

First we need to know how Kafka producer is working. Kafka producer exposes very simple API for sending messages to Kafka topics. The most important methods from KafkaProducer class are listed below:

KafkaProducer API
1
2
3
4
j.u.c.Future<RecordMetadata> send(ProducerRecord<K,V> record)
j.u.c.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
void flush()
void close()

The send() methods asynchronously send a key-value record to a topic and will return immediately once the record has been stored in the buffer of records waiting to be sent. This kind of API is not very convenient for developers, but is crucial to achieve high throughput and low latency.

If you want to ensure that request has been completed, you can invoke blocking get() on the future returned by the send() methods. The main drawback of calling get() is a huge performance penalty because it disables batching effectively. You can not expect high throughput and low latency if the execution is blocked on every message and every single message needs to be sent separately.

Fully non-blocking usage requires use of the callback. The callback will be invoked when the request is complete. Note that callback is executed in Kafka producer I/O thread so should not block the caller, the callback must be as lightweight as possible. The callback must be also properly synchronized due to Java memory model.

If the Kafka producer caller does not check result of the send() method using future or callback, it means that if Kafka producer crashed all messages from the internal Kafka producer buffer will be lost. This is the first, very important element of any integration library with Kafka, we should expect callback handling to avoid data lost and achieve good performance.

The flush() method makes all buffered messages ready to send, and blocks on the completion of the requests associated with these messages. The close() method is like the flush() method but also closes the producer.

The flush() method could be very handy if the Streaming framework wants to ensure that all messages have been sent before processing next part of the stream. With flush() method streaming framework is able to flush the messages to Kafka brokers to simulate commit behaviour.

Method flush() was added in Kafka 0.9 release (KIP-8). Before Kafka 0.9, the only safe and straightforward way to flush messages from Kafka producer internal buffer was to close the producer.

Kafka configuration

If the message must be reliable published on Kafka cluster, Kafka producer and Kafka cluster needs to be configured with care. It needs to be done independently of chosen streaming framework.

Kafka producer buffers messages in memory before sending. When our memory buffer is exhausted, Kafka producer must either stop accepting new records (block) or throw errors. By default Kafka producer blocks and this behavior is legitimate for stream processing. The processing should be delayed if Kafka producer memory buffer is full and could not accept new messages. Ensure that block.on.buffer.full Kafka producer configuration property is set.

With default configuration, when Kafka broker (leader of the partition) receive the message, store the message in memory and immediately send acknowledgment to Kafka producer. To avoid data loss the message should be replicated to at least one replica (follower). Only when the follower acknowledges the leader, the leader acknowledges the producer.

This guarantee you will get with ack=all property in Kafka producer configuration. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive.

But this is not enough. The minimum number of replicas in-sync must be defined. You should configure min.insync.replicas property for every topic. I recommend to configure at least 2 in-sync replicas (leader and one follower). If you have datacenter with two zones, I also recommend to keep leader in the first zone and 2 followers in the second zone. This configuration guarantees that every message will be stored in both zones.

We are almost done with Kafka cluster configuration. When you set min.insync.replicas=2 property, the topic should be replicated with factor 2 + N. Where N is the number of brokers which could fail, and Kafka producer will still be able to publish messages to the cluster. I recommend to configure replication factor 3 for the topic (or more).

With replication factor 3, the number of brokers in the cluster should be at least 3 + M. When one or more brokers are unavailable, you will get underreplicated partitions state of the topics. With more brokers in the cluster than replication factor, you can reassign underreplicated partitions and achieve fully replicated cluster again. I recommend to build the 4 nodes cluster at least for topics with replication factor 3.

The last important Kafka cluster configuration property is unclean.leader.election.enable. It should be disabled (by default it is enabled) to avoid unrecoverable exceptions from Kafka consumer. Consider the situation when the latest committed offset is N, but after leader failure, the latest offset on the new leader is M < N. M < N because the new leader was elected from the lagging follower (not in-sync replica). When the streaming engine ask for data from offset N using Kafka consumer, it will get an exception because the offset N does not exist yet. Someone will have to fix offsets manually.

So the minimal recommended Kafka setup for reliable message processing is:

  • 4 nodes in the cluster
  • unclean.leader.election.enable=false in the brokers configuration
  • replication factor for the topics – 3
  • min.insync.replicas=2 property in topic configuration
  • ack=all property in the producer configuration
  • block.on.buffer.full=true property in the producer configuration

With the above setup your configuration should be resistant to single broker failure, and Kafka consumers will survive new leader election.

You could also take look at replica.lag.max.messages and replica.lag.time.max.ms properties for tuning when the follower is removed from ISR by the leader. But this is out of this blog post scope.

How to expand Spark API?

After this not so short introduction, we are ready to disassembly integration library for Spark Streaming and Apache Kafka. First DStream needs to be somehow expanded to support new method sendToKafka().

1
dstream.sendToKafka(kafkaProducerConfig, topic)

In Scala, the only way to add methods to existing API, is to use an implicit conversion feature.

1
2
3
4
5
6
7
8
9
object KafkaDStreamSink {

  import scala.language.implicitConversions

  implicit def createKafkaDStreamSink(dstream: DStream[KafkaPayload]): KafkaDStreamSink = {
    new KafkaDStreamSink(dstream)
  }

}

Whenever Scala compiler finds call to non-existing method sendToKafka() on DStream class, the stream will be implicitly wrapped into KafkaDStreamSink class, where method sendToKafka is finally defined. To enable implicit conversion for DStream add the import statement to your code, that’s all.

1
import KafkaDStreamSink._

How to send to Kafka in reliable way?

Let’s check how sendToKafka() method is defined step by step, this is the core part of the integration library.

1
2
3
4
5
6
7
8
9
class KafkaDStreamSink(dstream: DStream[KafkaPayload]) {

  def sendToKafka(config: Map[String, String], topic: String): Unit = {
    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { records =>
        // send records from every partition to Kafka
      }
    }
  }

There are two loops, first on wrapped dstream and second on rdd for every partition. Quite standard pattern for Spark programming model. Records from every partition are ready to be sent to Kafka topic by Spark executors. The destination topic name is given explicitly as the last parameter of the sendToKafka() method.

First step in the inner loop is getting Kafka producer instance from the KafkaProducerFactory.

1
2
3
rdd.foreachPartition { records =>
  val producer = KafkaProducerFactory.getOrCreateProducer(config)
  (...)

The factory creates only single instance of the producer for any given producer configuration. If the producer instance has been already created, the existing instance is returned and reused. Kafka producer caching is crucial for the performance reasons, because establishing a connection to the cluster takes time. It is a much more time consuming operation than opening plain socket connection, as Kafka producer needs to discover leaders for all partitions. Please refer to first part of this blog post and KafkaProducerFactory source for more details about the factory implementation.

For debugging purposes logger and Spark task context are needed.

1
2
3
4
5
6
7
8
9
10
import org.apache.spark.TaskContext
import org.slf4j.LoggerFactory
(...)

rdd.foreachPartition { records =>
  val producer = KafkaProducerFactory.getOrCreateProducer(config)

  val context = TaskContext.get
  val logger = Logger(LoggerFactory.getLogger(classOf[KafkaDStreamSink]))
  (...)

You could use any logging framework but the logger itself has to be defined in the foreachPartition loop to avoid weird serialization issues. Spark task context will be used to get current partition identifier. I don’t like static call for getting task context, but this is an official way to do that. See pull request SPARK-5927 for more details.

Before we go further, Kafka producer callback for error handling needs to be introduced.

KafkaDStreamSinkExceptionHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.atomic.AtomicReference
import org.apache.kafka.clients.producer.Callback

class KafkaDStreamSinkExceptionHandler extends Callback {

  private val lastException = new AtomicReference[Option[Exception]](None)

  override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
    lastException.set(Option(exception))

  def throwExceptionIfAny(): Unit =
    lastException.getAndSet(None).foreach(ex => throw ex)

}

Method onCompletion() of the callback is called when the message sent to the Kafka cluster has been acknowledged. Exactly one of the callback arguments will be non-null, metadata or exception. KafkaDStreamSinkExceptionHandler class keeps last exception registered by the callback (if any). The client of the callback is able to rethrow registered exception using throwExceptionIfAny() method. Because onCompletion() and throwExceptionIfAny() methods are called from different threads, last exception has to be kept in thread-safe data structure AtomicReference.

Finally we are ready to send records to Kafka using created callback.

1
2
3
4
5
6
7
8
9
10
11
12
13
rdd.foreachPartition { records =>
  val producer = KafkaProducerFactory.getOrCreateProducer(config)

  val context = TaskContext.get
  val logger = Logger(LoggerFactory.getLogger(classOf[KafkaDStreamSink]))

  val callback = new KafkaDStreamSinkExceptionHandler

  logger.debug(s"Send Spark partition: ${context.partitionId} to Kafka topic: $topic")
  val metadata = records.map { record =>
    callback.throwExceptionIfAny()
    producer.send(new ProducerRecord(topic, record.key.orNull, record.value), callback)
  }.toList

First the callback is examined for registered exception. If one of the previous record could not be sent, the exception is propagated to Spark framework. If any redelivery policy is needed it should be configured on Kafka producer level. Look at Kafka producer configuration properties retries and retry.backoff.ms. Finally Kafka producer metadata are collected and materialized by calling toList() method. At this moment, Kafka producer starts sending records in background I/O thread. To achieve high throughput Kafka producer sends records in batches.

Because we want to achieve natural back pressure for our stream processing, next batch needs to be blocked until records from current batch are really acknowledged by the Kafka brokers. So for each collected metadata (Java j.u.c.Future), method get() is called to ensure that record has been sent to the brokers.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
rdd.foreachPartition { records =>
  val producer = KafkaProducerFactory.getOrCreateProducer(config)

  val context = TaskContext.get
  val logger = Logger(LoggerFactory.getLogger(classOf[KafkaDStreamSink]))

  val callback = new KafkaDStreamSinkExceptionHandler

  logger.debug(s"Send Spark partition: ${context.partitionId} to Kafka topic: $topic")
  val metadata = records.map { record =>
    callback.throwExceptionIfAny()
    producer.send(new ProducerRecord(topic, record.key.orNull, record.value), callback)
  }.toList

  logger.debug(s"Flush Spark partition: ${context.partitionId} to Kafka topic: $topic")
  metadata.foreach { metadata => metadata.get() }

As long as records sending was started moment ago, it is likelihood that records have been already sent and get() method does not block. However if the get() call is blocked, it means that there are unsent messages in the internal Kafka producer buffer and the processing should be blocked as well.

Finally sendToKafka() method should propagate exception recorded by the callback (if any). Complete method is presented below for reference.

sendToKafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def sendToKafka(config: Map[String, String], topic: String): Unit = {
  dstream.foreachRDD { rdd =>
    rdd.foreachPartition { records =>
      val producer = KafkaProducerFactory.getOrCreateProducer(config)

      val context = TaskContext.get
      val logger = Logger(LoggerFactory.getLogger(classOf[KafkaDStreamSink]))

      val callback = new KafkaDStreamSinkExceptionHandler

      logger.debug(s"Send Spark partition: ${context.partitionId} to Kafka topic: $topic")
      val metadata = records.map { record =>
        callback.throwExceptionIfAny()
        producer.send(new ProducerRecord(topic, record.key.orNull, record.value), callback)
      }.toList

      logger.debug(s"Flush Spark partition: ${context.partitionId} to Kafka topic: $topic")
      metadata.foreach { metadata => metadata.get() }

      callback.throwExceptionIfAny()
    }
  }
}

The method is not very complex but there are a few important elements if you don’t want to lose processing results and if you need back pressure mechanism:

  • Method sendToKafka() should fail fast if record could not be sent to Kafka. Don’t worry Spark will execute failed task again.
  • Method sendToKafka() should block Spark processing if Kafka producer slows down.
  • Method sendToKafka() should flush records buffered by Kafka producer explicitly, to avoid data loss.
  • Kafka producer needs to be reused by Spark executor to avoid connection to Kafka overhead.
  • Kafka producer needs to be explicitly closed when Spark shutdowns executors to avoid data loss.

Summary

The complete, working project is published on https://github.com/mkuthan/example-spark-kafka. You can clone/fork the project and do some experiments by yourself.

There is also alternative library developed by Cloudera spark-kafka-writer emerged from closed pull request SPARK-2994. Unfortunately at the time of this writing, the library used obsolete Scala Kafka producer API and did not send processing results in reliable way.

I hope that some day we will find reliable, mature library for sending processing result to Apache Kafka in the official Spark distribution.

Spark and Kafka Integration Patterns, Part 1

I published post on the allegro.tech blog, how to integrate Spark Streaming and Kafka. In the blog post you will find how to avoid java.io.NotSerializableException exception when Kafka producer is used for publishing results of the Spark Streaming processing.

http://allegro.tech/spark-kafka-integration.html

You could be also interested in the following part of this blog post where I presented complete library for sending Spark Streaming processing results to Kafka.

Happy reading :–)

Spark and Spark Streaming Unit Testing

When you develop distributed system, it is crucial to make it easy to test. Execute tests in controlled environment, ideally from your IDE. Long develop-test-develop cycle for complex systems could kill your productivity. Below you find my testing strategy for Spark and Spark Streaming applications.

Unit or integration tests, that is the question

Our hypothetical Spark application pulls data from Apache Kafka, apply transformations using RDDs and DStreams and persist outcomes into Cassandra or Elastic Search database. On production Spark application is deployed on YARN or Mesos cluster, and everything is glued with ZooKeeper. Big picture of the stream processing architecture is presented below:

Lots of moving parts, not so easy to configure and test. Even with automated provisioning implemented with Vagrant, Docker and Ansible. If you can’t test everything, test at least the most important part of your application – transformations – implemented with Spark.

Spark claims, that it is friendly to unit testing with any popular unit test framework. To be strict, Spark supports rather lightweight integration testing, not unit testing, IMHO. But still it is much more convenient to test transformation logic locally, than deploying all parts on YARN.

There is a pull request SPARK-1751 that adds “unit tests” support for Apache Kafka streams. Should we follow that way? Embedded ZooKeeper and embedded Apache Kafka are needed, the test fixture is complex and cumbersome. Perhaps tests would be fragile and hard to maintain. This approach makes sense for Spark core team, they want to test Spark and Kafka integration.

What should be tested?

Our transformation logic implemented with Spark, nothing more. But how to test the logic so tightly coupled to Spark API (RDD, DStream)? Let’s define how typical Spark application is organized. Our hypothetical application structure looks like this:

  1. Initialize SparkContext or StreamingContext.
  2. Create RDD or DStream for given source (e.g: Apache Kafka)
  3. Evaluate transformations on RDD or DStream API.
  4. Put transformation outcomes (e.g: aggregations) into external database.

Context

SparkContext and StreamingContext could be easily initialized for testing purposes. Set master URL to local, run the operations and then stop context gracefully.

SparkContext Initialization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class SparkExampleSpec extends FlatSpec with BeforeAndAfter {

  private val master = "local[2]"
  private val appName = "example-spark"

  private var sc: SparkContext = _

  before {
    val conf = new SparkConf()
      .setMaster(master)
      .setAppName(appName)

    sc = new SparkContext(conf)
  }

  after {
    if (sc != null) {
      sc.stop()
    }
  }
  (...)
StreamingContext Initialization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class SparkStreamingExampleSpec extends FlatSpec with BeforeAndAfter {

  private val master = "local[2]"
  private val appName = "example-spark-streaming"
  private val batchDuration = Seconds(1)
  private val checkpointDir = Files.createTempDirectory(appName).toString

  private var sc: SparkContext = _
  private var ssc: StreamingContext = _

  before {
    val conf = new SparkConf()
      .setMaster(master)
      .setAppName(appName)

    ssc = new StreamingContext(conf, batchDuration)
    ssc.checkpoint(checkpointDir)

    sc = ssc.sparkContext
  }

  after {
    if (ssc != null) {
      ssc.stop()
    }
  }

  (...)

RDD and DStream

The problematic part is how to create RDD or DStream. For testing purposes it must be simplified to avoid embedded Kafka and ZooKeeper. Below you can find examples how to create in-memory RDD and DStream.

In-memory RDD
1
2
val lines = Seq("To be or not to be.", "That is the question.")
val rdd = sparkContext.parallelize(lines)
In-memory DStream
1
2
3
4
5
val lines = mutable.Queue[RDD[String]]()
val dstream = streamingContext.queueStream(lines)

// append data to DStream
lines += sparkContext.makeRDD(Seq("To be or not to be.", "That is the question."))

Transformation logic

The most important part of our application – transformations logic – must be encapsulated in separate class or object. Object is preferred to avoid class serialization overhead. Exactly the same code is used by the application and by the test.

WordCount.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
case class WordCount(word: String, count: Int)

object WordCount {
  def count(lines: RDD[String], stopWords: Set[String]): RDD[WordCount] = {
    val words = lines.flatMap(_.split("\\s"))
      .map(_.strip(",").strip(".").toLowerCase)
      .filter(!stopWords.contains(_)).filter(!_.isEmpty)

    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _).map {
      case (word: String, count: Int) => WordCount(word, count)
    }

    val sortedWordCounts = wordCounts.sortBy(_.word)

    sortedWordCounts
  }
}

Spark test

Now it is time to implement our first test for WordCount transformation. The code of test is very straightforward and easy to read. Single point of truth, the best documentation of your system, always up-to-date.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
"Shakespeare most famous quote" should "be counted" in {
    Given("quote")
    val lines = Array("To be or not to be.", "That is the question.")

    Given("stop words")
    val stopWords = Set("the")

    When("count words")
    val wordCounts = WordCount.count(sc.parallelize(lines), stopWords).collect()

    Then("words counted")
    wordCounts should equal(Array(
      WordCount("be", 2),
      WordCount("is", 1),
      WordCount("not", 1),
      WordCount("or", 1),
      WordCount("question", 1),
      WordCount("that", 1),
      WordCount("to", 2)))
  }

Spark Streaming test

Spark Streaming transformations are much more complex to test. The full control over clock is needed to manually manage batches, slides and windows. Without controlled clock you would end up with complex tests with many Thread.sleeep calls. And the test execution would take ages. The only downside is that you will not have extra time for coffee during tests execution.

Spark Streaming provides necessary abstraction over system clock, ManualClock class. Unfortunately ManualClock class is declared as package private. Some hack is needed. The wrapper presented below, is an adapter for the original ManualClock class but without access restriction.

ClockWrapper.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package org.apache.spark.streaming

import org.apache.spark.streaming.util.ManualClock

class ClockWrapper(ssc: StreamingContext) {

  def getTimeMillis(): Long = manualClock().currentTime()

  def setTime(timeToSet: Long) = manualClock().setTime(timeToSet)

  def advance(timeToAdd: Long) = manualClock().addToTime(timeToAdd)

  def waitTillTime(targetTime: Long): Long = manualClock().waitTillTime(targetTime)

  private def manualClock(): ManualClock = {
    ssc.scheduler.clock.asInstanceOf[ManualClock]
  }

}

Now Spark Streaming test can be implemented in efficient way. The test does not have to wait for system clock and test is implemented with millisecond precision. You can easily test your windowed scenario from the very beginning to very end. With given\when\then structure you should be able to understand tested logic without further explanations.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
"Sample set" should "be counted" in {
  Given("streaming context is initialized")
  val lines = mutable.Queue[RDD[String]]()

  var results = ListBuffer.empty[Array[WordCount]]

  WordCount.count(ssc.queueStream(lines), windowDuration, slideDuration) { (wordsCount: RDD[WordCount], time: Time) =>
    results += wordsCount.collect()
  }

  ssc.start()

  When("first set of words queued")
  lines += sc.makeRDD(Seq("a", "b"))

  Then("words counted after first slide")
  clock.advance(slideDuration.milliseconds)
  eventually(timeout(1 second)) {
    results.last should equal(Array(
      WordCount("a", 1),
      WordCount("b", 1)))
  }

  When("second set of words queued")
  lines += sc.makeRDD(Seq("b", "c"))

  Then("words counted after second slide")
  clock.advance(slideDuration.milliseconds)
  eventually(timeout(1 second)) {
    results.last should equal(Array(
      WordCount("a", 1),
      WordCount("b", 2),
      WordCount("c", 1)))
  }

  When("nothing more queued")

  Then("word counted after third slide")
  clock.advance(slideDuration.milliseconds)
  eventually(timeout(1 second)) {
    results.last should equal(Array(
      WordCount("a", 0),
      WordCount("b", 1),
      WordCount("c", 1)))
  }

  When("nothing more queued")

  Then("word counted after fourth slide")
  clock.advance(slideDuration.milliseconds)
  eventually(timeout(1 second)) {
    results.last should equal(Array(
      WordCount("a", 0),
      WordCount("b", 0),
      WordCount("c", 0)))
  }
}

One comment to Eventually trait usage. The trait is needed because Spark Streaming is a multithreaded application, and results are not computed immediately. I found that 1 second timeout is enough for Spark Streaming to calculate the results. The timeout is not related to batch, slide or window duration.

Summary

The complete, working project is published on GitHub. You can clone/fork the project and do some experiments by yourself.

I hope that Spark committers expose ManualClock for others, eventually. Control of time is necessary for efficient Spark Streaming application testing.

How to Learn DDD

Books

Domain Driven Design by Eric Evans.

You have to read this book, period. From the very beginning to very end. Do not stop reading after first part of the book, the part about strategic design is much more important. Study this book again and again. I did not read this book at once, it would be impossible mission. Every time I back to this book I found something new, every single word in this book is important and brings some meaning.

Implementing Domain Driven Design by Vaughn Vernon.

More practical and easier to digest book than previous one. Not so brilliant but still worth reading.

Exploring CQRS and Event Sourcing

Excellent DDD/CQRS case study with working code on GitHub. Real world example how to define bounded context and how to integrate them using domain events. Awesome!

Enterprise Patterns and MDA by Jim Arlow and Ila Neusandt

Do not reinvent the wheel when you discover your domain model. At least for e-commerce :–) Apply presented archetype patterns wisely and save your ass.

My examples

http://mkuthan.github.io/presentations/ddd.html – “Domain Driven Desing – from trenches for practitioners” presentation.

http://mkuthan.github.io/blog/2013/11/04/ddd-architecture-summary/ – Blog post – my DDD check list.

https://github.com/mkuthan/example-ddd-cqrs-server – Experiment based on Vernon book.

https://github.com/mkuthan/example-axon – Experiment based on Exploring CQRS and Event Sourcing book.

Other sources

http://www.udidahan.com/ – Udi Dahan – one of my mentor in distributed systems and DDD architecture.

https://groups.yahoo.com/neo/groups/domaindrivendesign – official DDD discussion group, addictive reading for long winter evenings.

Programming Language Does Not Matter

A few days ago I participated in quick presentation of significant e-commerce platform. The custom platform implemented mostly in PHP and designed as scalable and distributed system. And I was really impressed! Below you can find a short summary of chosen libraries, frameworks and tools.

Symfony – The leading PHP framework to create web applications. Very similar to Spring Framework, you will get dependency injection, layered architecture and good support for automated testing.

Doctrine – Object to relational mapper (ORM), part of the Symfony framework. Very similar to JPA.

Composer – Dependency manager for PHP, very similar to NPM.

Gearman – Generic application framework to farm out work to other machines or processes. Somehow similar to YARN.

Varnish – HTTP reverse proxy.

Memcached – Distributed key value cache.

RabbitMQ – Messaging middleware based on AMPQ protocol. Used for distributed services integration but also for decoupled request reply communication.

logstash – Log manager with tons of plugins to almost everything. The monitoring is really crucial in distributed systems.

The programming language does not really matter if you need scalable, distributed, easy to maintain and enhance system. You can apply excellent design using PHP or produce big ball of mud in Java.

Mastering Node.js - Book Review

Overview

I’m really impressed by Node.js (and JavaScript) ecosystem. I took Mastering Node.js book to understand Node.js philosophy and compare to JVM world.

V8

JavaScript virtual machine, conceptually very similar to JVM. The most important element of JavaScript ecosystem if you want to do something more than client side web application. I really like Node.js REPL, experimentation is as easy as with Scala.

Event loop

Elegant simulation of concurrency. Do you remember Swing event dispatch thread and invokeLater() method? Event loop is the same. It is crucial to understand events handling order:

  • emitted event
  • timers
  • IO callbacks
  • deferred execution blocks

Event driven concurrency

Process is a first class citizen. The easiest (and cheapest) way to achieve concurrency with horizontal scalability.

Real-time applications

I enhanced drawing board presented in the book. It was great fun together with my 2 years old son :–) Scalable server side implementation is presented below, I could not even imagine Java version.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var express = require('express')
var path = require('path');
var app = express();
var http = require('http').Server(app);
var io = require('socket.io')(http);
var redis = require('socket.io-redis');

io.adapter(redis({ host: 'localhost', port: 6379 }));

var port = parseInt(process.argv[2]);

app.use(express.static(path.join(__dirname, 'assets')));

app.get('/', function(req, res){
  res.sendfile('index.html');
});

io.on('connection', function (socket) {
  socket.on('move', function (data) {
    socket.broadcast.emit('moving', data);
  });
});

http.listen(port, function(){
  console.log('Board started on: ' + port);
});

Keep in mind that SSE is unidirectional from server to clients and requires funny 2KB padding. I didn’t know that before.

Scaling on single node

Spawning, forking child processes is easy, communication between parent and children processes is easy as well. Cluster module simplifies web application implementation for multi-core processors and it is very easy to understand and control.

Horizontal scaling

Keep shared state in horizontally scalable store, e.g: session data in Redis or RabbitMq for events.

Apache Bench

Command line tool for load/stress testing. Using JMeter or Gatling is not always the only way to perform simple test.

UDP / Multicast

Good to know the world behind HTTP/REST/SOAP … There is a lot of important layers between application and wire, do you remember OSI?

AWS

I have to practice using S3 or DynamoDB eventually.

Node debugger

OMG – I used to debug application using console 10 years ago or so ;–)

Express, Socket.io, Path

Implementing web application using Node.js only is feasible but with Express it is much easier.

Be aware that there are thousands of web frameworks for Node.js on the market. Much more that for Java 10 years ago ;–) It seems that frameworks built around WebSocket and Single Page App should be the leaders.

Interesing resources

Comparing the Performance of Web Server Architectures

Broken Promises

Summary

JavaScript and Node.js seem to be one of the most vital ecosystem for web development. The adoption in the enterprise world is still low but I really like this ecosystem and its community. And I’m still waiting for final version of ES6, sometimes JavaScript really sucks.