Test Driven Development (TDD) is a well established practice in a software engineering community. It helps to guarantee that code is reliable and error-free by requiring developers to write tests before writing the actual code, and promotes better code design and modularity for easier maintenance and extension. Paradoxically, it often speeds up development by reducing debugging time and catching issues early in the development process.

Unfortunately many data engineers aren’t aware of TDD or its benefits. They change pipeline code, build an assembly, deploy the assembly to the cluster, run the pipeline and manually verify results using SQL. How much time does it take? I would guess at least 30 minutes.

What if you reduce development cycle time from 30 minutes to 10 seconds and deliver code of higher quality?

Data pipeline

Today I will develop batch data pipelines from scratch using TDD approach. Data pipeline which calculates statistics from toll booth entries you pass on highways, bridges, or tunnels. For example:

  • How many vehicles crossed the toll booth entry
  • How many tolls the toll booth collected
  • Time of the first and the last entry

I drew inspiration for the design of this data pipeline from the Azure documentation titled Build an IoT solution by using Stream Analytics. Instead of relying on SQL, my choice was to use Apache Beam, Spotify Scio and a set of custom-built utilities.

Command line contract

Command line arguments constitute the data pipeline API. Design such API and implement job end-to-end tests first. For calculating toll booth statistics, data pipeline takes 4 parameters:

  • Effective date specifying the day for which the job calculate statistics
  • Table with toll booth entries
  • Tables for the results, one for hourly and one for daily statistics
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import com.spotify.scio.testing.JobTest

import org.mkuthan.streamprocessing.test.scio.JobTestScioContext

class TollBoothEntryStatsJobTest extends AnyFlatSpec with Matchers with JobTestScioContext {
  "Toll job" should "run in the batch mode" in {
    JobTest[TollBoothEntryStatsJob.type]
      .args(
        "--effectiveDate=2014-09-10",
        "--entryTable=toll.entry",
        "--entryStatsHourlyTable=toll.entry_stats_hourly",
        "--entryStatsDailyTable=toll.entry_stats_daily"
      )
      .run()
  }
}

Run the test:

1
2
3
4
5
6
sbt> testOnly *TollBoothEntryStatsJobTest
[error] TollBoothEntryStatsJobTest.scala:12:13: not found: value TollBoothEntryStatsJob
[error]     JobTest[TollBoothEntryStatsJob.type]
[error]             ^
[error] one error found
[error] (Test / compileIncremental) Compilation failed

Test fails because there is no TollBoothEntryStatsJob class with data pipeline implementation yet. Implement the skeleton of the job, it does literally nothing but fulfill the command line parameters contract.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import com.spotify.scio.ContextAndArgs
import org.joda.time.LocalDate

object TollBoothEntryStatsJob {
  def main(mainArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(mainArgs)

    val effectiveDate = LocalDate.parse(args.required("effectiveDate"))
    val entryTable = args.required("entryTable")
    val entryStatsHourlyTable = args.required("entryStatsHourlyTable")
    val entryStatsDailyTable = args.required("entryStatsDailyTable")

    sc.run()
  }
}

Run the test again:

1
2
3
4
5
6
7
8
9
sbt> testOnly *TollBoothEntryStatsJobTest
[info] TollBoothEntryStatsJobTest:
[info] Toll job
[info] - should run in the batch mode
[info] Run completed in 3 seconds, 877 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

End-to-end test using Apache Beam direct runner executed in 3.9 seconds 🚀

Input data

TollBoothEntryStatsJob needs input data, define type for toll booth entry records:

1
2
3
4
5
6
7
8
9
10
11
import com.spotify.scio.bigquery.types.BigQueryType

object TollBoothEntry {
  @BigQueryType.toTable
  final case class Record(
    id: String,
    entry_time: Instant,
    license_plate: String,
    toll: BigDecimal,
  )
}

Test fixture with anyTollBoothEntryRecord prototype:

1
2
3
4
5
6
7
8
trait TollBoothEntryFixture {
  final val anyTollBoothEntryRecord = TollBoothEntry.Record(
    id = "1",
    entry_time = Instant.parse("2014-09-10T12:01:00Z"),
    license_plate = "JNB 7001",
    toll = BigDecimal(7),
  )
}

Use anyTollBoothEntryRecord as an input for the job. Keep it simple, it’s a happy path scenario with a single vehicle crossing the toll booth.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
"Toll job" should "run in the batch mode" in {
  JobTest[TollBoothEntryStatsJob.type]
    .args(
    "--effectiveDate=2014-09-10",
    "--entryTable=toll.entry",
    "--entryStatsHourlyTable=toll.entry_stats_hourly",
    "--entryStatsDailyTable=toll.entry_stats_daily"
    )
    .input(
      CustomIO[TollBoothEntry.Record]("entry-table-id"),
      Seq(anyTollBoothEntryRecord)
    )
    .run()
}

Run the test:

1
2
3
4
5
6
7
8
9
10
11
sbt> testOnly *TollBoothEntryStatsJobTest
[info] TollBoothEntryStatsJobTest:
[info] Toll job
[info] - should run in the batch mode *** FAILED ***
[info]   java.lang.IllegalArgumentException: requirement failed:
Unmatched test input: CustomIO(entry-table-id)
[info] Run completed in 3 seconds, 891 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***

Test fails because TollBoothEntryStatsJob doesn’t use entry-table-id input. Implement reading toll booth entry records from BigQuery using row restriction to fetch only a single partition of data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.mkuthan.streamprocessing.infrastructure._
import org.mkuthan.streamprocessing.infrastructure.bigquery.BigQueryTable
import org.mkuthan.streamprocessing.infrastructure.bigquery.RowRestriction
import org.mkuthan.streamprocessing.infrastructure.bigquery.StorageReadConfiguration
import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier

def main(mainArgs: Array[String]): Unit = {
  val (sc, args) = ContextAndArgs(mainArgs)

  val effectiveDate = LocalDate.parse(args.required("effectiveDate"))
  val entryTable = args.required("entryTable")
  val entryStatsHourlyTable = args.required("entryStatsHourlyTable")
  val entryStatsDailyTable = args.required("entryStatsDailyTable")

  val entryRecords = sc.readFromBigQuery(
    IoIdentifier[TollBoothEntry.Record]("entry-table-id"),
    BigQueryTable[TollBoothEntry.Record](entryTable),
    StorageReadConfiguration().withRowRestriction(
      RowRestriction.TimestampColumnRestriction("entry_time", effectiveDate)
    )
  )

  sc.run()
}

Run the test again:

1
2
3
4
5
6
7
8
9
sbt> testOnly *TollBoothEntryStatsJobTest
[info] TollBoothEntryStatsJobTest:
[info] Toll job
[info] - should run in the batch mode
[info] Run completed in 4 seconds, 497 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Output data

TollBoothEntryStatsJob produces results, define type for toll booth stats records:

1
2
3
4
5
6
7
8
9
10
11
12
13
import com.spotify.scio.bigquery.types.BigQueryType

object TollBoothStats {
  @BigQueryType.toTable
  final case class Record(
    created_at: Instant,
    id: String,
    count: Int,
    total_toll: BigDecimal,
    first_entry_time: Instant,
    last_entry_time: Instant
 )
}

Test fixture with anyTollBoothStatsRecord prototype:

1
2
3
4
5
6
7
8
9
10
trait TollBoothStatsFixture {
  final val anyTollBoothStatsRecord = TollBoothStats.Record(
    created_at = Instant.EPOCH,
    id = "1",
    total_toll = BigDecimal(7),
    count = 1,
    first_entry_time = Instant.parse("2014-09-10T12:01:00.000Z"),
    last_entry_time = Instant.parse("2014-09-10T12:01:00.000Z")
  )
}

Use anyTollBoothStatsRecord to verify output of the job. Again, keep the happy path scenario simple with a single vehicle crossing the toll booth entry.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
"Toll job" should "run in the batch mode" in {
  JobTest[TollBatchJob.type]
    .args(
    "--effectiveDate=2014-09-10",
    "--entryTable=toll.entry",
    "--entryStatsHourlyTable=toll.entry_stats_hourly",
    "--entryStatsDailyTable=toll.entry_stats_daily"
    )
    .input(
      CustomIO[TollBoothEntry.Record]("entry-table-id"),
      Seq(anyTollBoothEntryRecord)
    )
    .output(CustomIO[TollBoothStats.Record]("entry-stats-hourly-table-id")) { results =>
      val endOfHourlyWindow = Instant.parse("2014-09-10T12:59:59.999Z")
      results should containElements(
        anyTollBoothStatsRecord.copy(created_at = endOfHourlyWindow)
      )
    }
    .output(CustomIO[TollBoothStats.Record]("entry-stats-daily-table-id")) { results =>
      val endOfDailyWindow = Instant.parse("2014-09-10T23:59:59.999Z")
      results should containElements(
        anyTollBoothStatsRecord.copy(created_at = endOfDailyWindow)
      )
    }
    .run()
}

Run the test:

1
2
3
4
5
6
7
8
9
10
11
12
13
sbt> testOnly *TollBoothEntryStatsJobTest
[info] TollBoothEntryStatsJobTest:
[info] Toll job
[info] - should run in the batch mode *** FAILED ***
[info]   java.lang.IllegalArgumentException: requirement failed:
Unmatched test output:
CustomIO(entry-stats-hourly-table-id),
CustomIO(entry-stats-daily-table-id)
[info] Run completed in 8 seconds, 704 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***

Test fails because TollBoothEntryStatsJob doesn’t produce results into entry-stats-hourly-table-id and entry-stats-daily-table-id outputs. Write toll booth stats records to BigQuery table. Don’t implement toll booth statistics at this stage, just reuse a test fixture as a job result.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.mkuthan.streamprocessing.infrastructure._
import org.mkuthan.streamprocessing.infrastructure.bigquery.BigQueryPartition
import org.mkuthan.streamprocessing.infrastructure.bigquery.BigQueryTable
import org.mkuthan.streamprocessing.infrastructure.bigquery.RowRestriction
import org.mkuthan.streamprocessing.infrastructure.bigquery.StorageReadConfiguration
import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier

def main(mainArgs: Array[String]): Unit = {
  val (sc, args) = ContextAndArgs(mainArgs)

  val effectiveDate = LocalDate.parse(args.required("effectiveDate"))
  val entryTable = args.required("entryTable")
  val entryStatsHourlyTable = args.required("entryStatsHourlyTable")
  val entryStatsDailyTable = args.required("entryStatsDailyTable")

  val entryRecords = sc.readFromBigQuery(
    IoIdentifier[TollBoothEntry.Record]("entry-table-id"),
    BigQueryTable[TollBoothEntry.Record](entryTable),
    StorageReadConfiguration().withRowRestriction(
      RowRestriction.TimestampColumnRestriction("entry_time", effectiveDate)
    )
  )

  // TODO: calculate toll booth entry stats

  val endOfHourlyWindow = Instant.parse("2014-09-10T12:59:59.999Z")
  sc
    .parallelize(Seq(anyTollBoothStatsRecord.copy(created_at = endOfHourlyWindow)))
    .writeBoundedToBigQuery(
      IoIdentifier("entry-stats-hourly-table-id"),
      BigQueryPartition.daily(entryStatsHourlyTable, effectiveDate)
    )

  val endOfDailyWindow = Instant.parse("2014-09-10T23:59:59.999Z")
  sc
    .parallelize(Seq(anyTollBoothStatsRecord.copy(created_at = endOfDailyWindow)))
    .writeBoundedToBigQuery(
      IoIdentifier("entry-stats-daily-table-id"),
      BigQueryPartition.daily(entryStatsDailyTable, effectiveDate)
    )

  sc.run()
}

Run the test again:

1
2
3
4
5
6
7
8
9
sbt> testOnly *TollBoothEntryStatsJobTest
[info] TollBoothEntryStatsJobTest:
[info] Toll job
[info] - should run in the batch mode
[info] Run completed in 6 seconds, 770 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Decode input into domain types

Define domain type TollBoothEntry using rich types:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final case class TollBoothId(id: String) {
  require(!id.isEmpty, "Toll booth id is empty")
}

final case class LicensePlate(number: String) {
  require(!number.isEmpty, "License plate number is empty")
}

final case class TollBoothEntry(
  id: TollBoothId,
  entryTime: Instant,
  licensePlate: LicensePlate,
  toll: BigDecimal
) {
    require(toll >= 0, s"Toll is negative: $toll")
}

Test fixture with anyTollBoothStatsRecord prototype:

1
2
3
4
5
6
7
8
trait TollBoothEntryFixture {
  final val anyTollBoothEntry = TollBoothEntry(
    id = TollBoothId("1"),
    entryTime = Instant.parse("2014-09-10T12:01:00Z"),
    toll = BigDecimal(7),
    licensePlate = LicensePlate("JNB 7001")
  )
}

At this moment don’t touch TollBoothEntryStatsJobTest but define separate test for mapping between TollBoothEntry.Record and TollBoothEntry.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import org.mkuthan.streamprocessing.test.scio.syntax._
import org.mkuthan.streamprocessing.test.scio.TestScioContext

class TollBoothEntryTest extends AnyFlatSpec with Matchers
    with TestScioContext
    with TollBoothEntryFixture {

  import TollBoothEntry._

  behavior of "TollBoothEntry"

  it should "decode valid record into TollBoothEntry" in runWithScioContext { sc =>
    val inputs = boundedTestCollectionOf[TollBoothEntry.Record]
      .addElementsAtMinimumTime(anyTollBoothEntryRecord)
      .advanceWatermarkToInfinity()

    val results = decodeRecord(sc.testBounded(inputs))

    results.withTimestamp should containElementsAtTime(
      anyTollBoothEntry.entryTime,
      anyTollBoothEntry
    )
  }

  it should "throw an exception for invalid record" in {
    val thrown = the[RuntimeException] thrownBy {
      runWithScioContext { sc =>
        val invalidRecord = anyTollBoothEntryRecord.copy(id = "")

        val inputs = boundedTestCollectionOf[TollBoothEntry.Record]
          .addElementsAtMinimumTime(invalidRecord)
          .advanceWatermarkToInfinity()

        decodeRecord(sc.testBounded(inputs))

      }
    }
    thrown.getMessage should include("Toll booth id is empty")
  }
}

Run the test:

1
2
3
4
5
6
7
sbt:tollDomain> testOnly *TollBoothEntryTest
[error] TollBoothEntryTest.scala:59:19: not found: value decodeRecord
[error]     val results = decodeRecord(sc.testBounded(inputs))
[error]                   ^
[error] TollBoothEntryTest.scala:73:9: not found: value decodeRecord
[error]         decodeRecord(sc.testBounded(inputs))
[error]         ^

Implement decodeRecord function to fulfill the contract defined in tests:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
object TollBoothEntry {
  def decodeRecord(input: SCollection[TollBoothEntry.Record]): SCollection[TollBoothEntry] =
    input.transform { in =>
      in
        .map(record => fromRecord(record))
        .timestampBy(boothEntry => boothEntry.entryTime)
    }

  private def fromRecord(record: TollBoothEntry.Record) =
    TollBoothEntry(
      id = TollBoothId(record.id),
      entryTime = record.entry_time,
      licensePlate = LicensePlate(record.license_plate),
      toll = record.toll
    )
}

Run the test again:

1
2
3
4
5
6
7
8
9
10
sbt:tollDomain> testOnly *TollBoothEntryTest
[info] TollBoothEntryTest:
[info] TollBoothEntry
[info] - should decode valid record into TollBoothEntry
[info] - should throw an exception for invalid record
[info] Run completed in 8 seconds, 362 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Calculate toll booth stats

It’s time to implement core domain logic. Define domain type TollBoothStats using rich types:

1
2
3
4
5
6
7
8
9
10
11
12
final case class TollBoothStats(
  id: TollBoothId,
  count: Int,
  totalToll: BigDecimal,
  firstEntryTime: Instant,
  lastEntryTime: Instant
) {
  private def before(other: TollBoothStats): Boolean =
    firstEntryTime.isBefore(other.firstEntryTime)
  private def after(other: TollBoothStats): Boolean =
    lastEntryTime.isAfter(other.lastEntryTime)
}

Test fixture with anyTollBoothStats prototype:

1
2
3
4
5
6
7
8
9
trait TollBoothStatsFixture {
  final val anyTollBoothStats = TollBoothStats(
    id = TollBoothId("1"),
    totalToll = BigDecimal(7),
    count = 1,
    firstEntryTime = Instant.parse("2014-09-10T12:01:00.000Z"),
    lastEntryTime = Instant.parse("2014-09-10T12:01:00.000Z")
  )
}

At this moment don’t touch TollBoothEntryStatsJobTest but define separate test for calculating TollBoothStats from TollBoothEntry.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class TollBoothStatsTest extends AnyFlatSpec with Matchers
    with TestScioContext
    with TollBoothEntryFixture
    with TollBoothStatsFixture {

  import TollBoothStats._

  private val FiveMinutes = Duration.standardMinutes(5)
  private val DefaultWindowOptions = WindowOptions()

  behavior of "TollBoothStats"

  it should "calculate statistics in fixed window" in runWithScioContext { sc =>
    val tollBoothId1 = TollBoothId("1")
    val tollBoothId2 = TollBoothId("2")

    val entry1 = anyTollBoothEntry.copy(
      id = tollBoothId1,
      entryTime = Instant.parse("2014-09-10T12:01:00.000Z"),
      toll = BigDecimal(2)
    )

    val entry2 = anyTollBoothEntry.copy(
      id = tollBoothId1,
      entryTime = Instant.parse("2014-09-10T12:01:30.000Z"),
      toll = BigDecimal(1)
    )

    val entry3 = anyTollBoothEntry.copy(
      id = tollBoothId2,
      entryTime = Instant.parse("2014-09-10T12:04:00.000Z"),
      toll = BigDecimal(4)
    )

    val inputs = boundedTestCollectionOf[TollBoothEntry]
      .addElementsAtTime(entry1.entryTime, entry1)
      .addElementsAtTime(entry2.entryTime, entry2)
      .addElementsAtTime(entry3.entryTime, entry3)
      .advanceWatermarkToInfinity()

    val results = calculateInFixedWindow(sc.testBounded(inputs), FiveMinutes, DefaultWindowOptions)

    results.withTimestamp should inOnTimePane("2014-09-10T12:00:00Z", "2014-09-10T12:05:00Z") {
      containElementsAtTime(
        "2014-09-10T12:04:59.999Z",
        anyTollBoothStats.copy(
          id = tollBoothId1,
          count = 2,
          totalToll = BigDecimal(2 + 1),
          firstEntryTime = entry1.entryTime,
          lastEntryTime = entry2.entryTime
        ),
        anyTollBoothStats.copy(
          id = tollBoothId2,
          count = 1,
          totalToll = BigDecimal(4),
          firstEntryTime = entry3.entryTime,
          lastEntryTime = entry3.entryTime
        )
      )
    }
  }

  it should "calculate statistics in fixed window for late entries" in runWithScioContext { sc =>
    val onTimeEntry1 = anyTollBoothEntry.copy(
      entryTime = Instant.parse("2014-09-10T12:01:00Z"),
      toll = BigDecimal(2)
    )

    val onTimeEntry2 = anyTollBoothEntry.copy(
      entryTime = Instant.parse("2014-09-10T12:02:00Z"),
      toll = BigDecimal(3)
    )

    val lateEntry = anyTollBoothEntry.copy(
      entryTime = Instant.parse("2014-09-10T12:03:00Z"),
      toll = BigDecimal(1)
    )

    val inputs = unboundedTestCollectionOf[TollBoothEntry]
      .addElementsAtTime(onTimeEntry1.entryTime, onTimeEntry1)
      .addElementsAtTime(onTimeEntry2.entryTime, onTimeEntry2)
      .advanceWatermarkTo("2014-09-10T12:05:00Z")
      .addElementsAtTime(lateEntry.entryTime, lateEntry)
      .advanceWatermarkToInfinity()

    val windowOptions = WindowOptions(
      allowedLateness = Duration.standardMinutes(2),
      accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
    )

    val results = calculateInFixedWindow(sc.testUnbounded(inputs), FiveMinutes, windowOptions)

    val windowStart = "2014-09-10T12:00:00Z"
    val windowEnd = "2014-09-10T12:05:00Z"
    val recordTimestamp = Instant.parse("2014-09-10T12:04:59.999Z")

    results.withTimestamp should inOnTimePane(windowStart, windowEnd) {
      containElementsAtTime(
        recordTimestamp,
        anyTollBoothStats.copy(
          count = 2,
          totalToll = BigDecimal(2 + 3),
          firstEntryTime = onTimeEntry1.entryTime,
          lastEntryTime = onTimeEntry2.entryTime
        )
      )
    }

    results.withTimestamp should inLatePane(windowStart, windowEnd) {
      containElementsAtTime(
        recordTimestamp,
        anyTollBoothStats.copy(
          count = 1,
          totalToll = BigDecimal(1),
          firstEntryTime = lateEntry.entryTime,
          lastEntryTime = lateEntry.entryTime
        )
      )
    }
  }
}

Look at Stream Processing – part1 and Stream Processing – part2 to learn how to test complex domain logic

Run tests:

1
2
3
4
5
6
7
sbt:tollDomain> testOnly *TollBoothStatsTest
[error] TollBoothStatsTest.scala:56:19: not found: value calculateInFixedWindow
[error]     val results = calculateInFixedWindow(sc.testBounded(inputs), FiveMinutes, DefaultWindowOptions)
[error]                   ^
[error] TollBoothStatsTest.scala:107:19: not found: value calculateInFixedWindow
[error]     val results = calculateInFixedWindow(sc.testUnbounded(inputs), FiveMinutes, windowOptions)
[error]

Implement calculateInFixedWindow function to fulfill the contract defined in tests:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
object TollBoothStats {
  def calculateInFixedWindow(
    input: SCollection[TollBoothEntry],
    windowDuration: Duration,
    windowOptions: WindowOptions
  ): SCollection[TollBoothStats] =
    input.transform { in =>
      in
        .map(fromBoothEntry)
        .sumByKeyInFixedWindow(windowDuration, windowOptions)
    }

  private def fromBoothEntry(boothEntry: TollBoothEntry) = TollBoothStats(
    id = boothEntry.id,
    count = 1,
    totalToll = boothEntry.toll,
    firstEntryTime = boothEntry.entryTime,
    lastEntryTime = boothEntry.entryTime
  )

  implicit val TollBoothStatsSumByKey = SumByKey.create(
    keyFn = _.id.id,
    plusFn = (x, y) =>
      x.copy(
        count = x.count + y.count,
        totalToll = x.totalToll + y.totalToll,
        firstEntryTime = if (x.before(y)) x.firstEntryTime else y.firstEntryTime,
        lastEntryTime = if (x.after(y)) x.lastEntryTime else y.lastEntryTime
      )
  )
}

Run tests again:

1
2
3
4
5
6
7
8
9
10
sbt> testOnly *TollBoothStatsTest
[info] TollBoothStatsTest:
[info] TollBoothStats
[info] - should calculate statistics in fixed window
[info] - should calculate statistics in fixed window for late entries
[info] Run completed in 7 seconds, 553 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Well done on successfully implementing and testing intricate domain logic without the need for a single data pipeline deployment. Consider this scenario: if you had to repeat the testing process 50 times before achieving the final implementation, it would consume 1500 minutes or 25 hours of your time during a 30-minute development cycle. However, when working within a 7.5-second development cycle, it would only take 375 seconds or 6.25 minutes.

Dear data engineer, can you now appreciate the significance of Test-Driven Development (TDD) in your work?

Encode toll booth stats into record

We’re almost done, add a test scenario for TollBoothStats encoding into TollBoothStats.Record.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TollBoothStatsTest extends AnyFlatSpec with Matchers
    with TestScioContext
    with TollBoothEntryFixture
    with TollBoothStatsFixture {

  import TollBoothStats._

  behavior of "TollBoothStats"

  (...)

  it should "encode into record" in runWithScioContext { sc =>
    val createdAt = Instant.parse("2014-09-10T12:04:59.999Z")
    val inputs = boundedTestCollectionOf[TollBoothStats]
      .addElementsAtTime(createdAt, anyTollBoothStats)
      .advanceWatermarkToInfinity()

    val results = encodeRecord(sc.testBounded(inputs))
    results should containElements(anyTollBoothStatsRecord.copy(created_at = createdAt))
  }
}

Run the test:

1
2
3
4
sbt> testOnly *TollBoothStatsTest
[error] TollBoothStatsTest.scala:144:19: not found: value encodeRecord
[error]     val results = encodeRecord(sc.testBounded(inputs))
[error]

Implement encodeRecord function to fulfill the contract defined in the test:

1
2
3
4
5
6
7
8
9
10
11
12
13
object TollBoothStats {
 def encodeRecord(input: SCollection[TollBoothStats]): SCollection[TollBoothStats.Record] =
    input.mapWithTimestamp { case (record, timestamp) =>
      Record(
        created_at = timestamp,
        id = record.id.id,
        count = record.count,
        total_toll = record.totalToll,
        first_entry_time = record.firstEntryTime,
        last_entry_time = record.lastEntryTime
      )
    }
}

Run the test again:

1
2
3
4
5
6
7
8
9
10
11
sbt> testOnly *TollBoothStatsTest
[info] TollBoothStatsTest:
[info] TollBoothStats
[info] - should calculate statistics in fixed window
[info] - should calculate statistics in fixed window for late entries
[info] - should encode into record
[info] Run completed in 6 seconds, 198 milliseconds.
[info] Total number of tests run: 3
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Finish data pipeline

Now all components are ready to finish the data pipeline and don’t break the job end-to-end test. Note, that TollBoothEntryStatsJob has little responsibility but delegates heavily to TollBoothEntry and TollBoothStats for executing actual domain logic.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import org.apache.beam.sdk.transforms.windowing.AfterWatermark
import org.apache.beam.sdk.transforms.windowing.Repeatedly
import org.apache.beam.sdk.transforms.windowing.Window
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode

import com.spotify.scio.values.WindowOptions
import com.spotify.scio.ContextAndArgs

import org.joda.time.Duration
import org.joda.time.LocalDate

import org.mkuthan.streamprocessing.infrastructure._
import org.mkuthan.streamprocessing.infrastructure.bigquery.BigQueryPartition
import org.mkuthan.streamprocessing.infrastructure.bigquery.BigQueryTable
import org.mkuthan.streamprocessing.infrastructure.bigquery.RowRestriction
import org.mkuthan.streamprocessing.infrastructure.bigquery.StorageReadConfiguration
import org.mkuthan.streamprocessing.infrastructure.common.IoIdentifier

object TollBoothEntryStatsJob {

  private val OneHour = Duration.standardHours(1)
  private val OneDay = Duration.standardDays(1)

  private val DefaultWindowOptions = WindowOptions(
    trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
    allowedLateness = Duration.ZERO,
    accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
    onTimeBehavior = Window.OnTimeBehavior.FIRE_IF_NON_EMPTY
  )

  def main(mainArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(mainArgs)

    val effectiveDate = LocalDate.parse(args.required("effectiveDate"))
    val entryTable = args.required("entryTable")
    val entryStatsHourlyTable = args.required("entryStatsHourlyTable")
    val entryStatsDailyTable = args.required("entryStatsDailyTable")

    val entryRecords = sc.readFromBigQuery(
      IoIdentifier[TollBoothEntry.Record]("entry-table-id"),
      BigQueryTable[TollBoothEntry.Record](entryTable),
      StorageReadConfiguration().withRowRestriction(
        RowRestriction.TimestampColumnRestriction("entry_time", effectiveDate)
      )
    )

    val entries = TollBoothEntry.decodeRecord(entryRecords)

    val tollBoothStatsHourly = TollBoothStats
      .calculateInFixedWindow(entries, OneHour, DefaultWindowOptions)

    TollBoothStats
      .encodeRecord(tollBoothStatsHourly)
      .writeBoundedToBigQuery(
        IoIdentifier[TollBoothStats.Record]("entry-stats-hourly-table-id"),
        BigQueryPartition.daily(entryStatsHourlyTable, effectiveDate)
      )

    val tollBoothStatsDaily = TollBoothStats
      .calculateInFixedWindow(entries, OneDay, DefaultWindowOptions)

    TollBoothStats
      .encodeRecord(tollBoothStatsDaily)
      .writeBoundedToBigQuery(
        IoIdentifier[TollBoothStats.Record]("entry-stats-daily-table-id"),
        BigQueryPartition.daily(entryStatsDailyTable, effectiveDate)
      )

    sc.run()
  }
}

Let’s revisit the end-to-end test. It remains unchanged, but I’ve included it here for your reference:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.spotify.scio.io.CustomIO
import com.spotify.scio.testing.JobTest

import org.joda.time.Instant
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import org.mkuthan.streamprocessing.test.scio.JobTestScioContext

class TollBoothEntryStatsJobTest extends AnyFlatSpec with Matchers with JobTestScioContext
    with TollBoothEntryFixture with TollBoothStatsFixture {
  "Toll job" should "run in the batch mode" in {
    JobTest[TollBoothEntryStatsJob.type]
      .args(
        "--effectiveDate=2014-09-10",
        "--entryTable=toll.entry",
        "--entryStatsHourlyTable=toll.entry_stats_hourly",
        "--entryStatsDailyTable=toll.entry_stats_daily"
      )
      .input(
        CustomIO[TollBoothEntry.Record]("entry-table-id"),
        Seq(anyTollBoothEntryRecord)
      )
      .output(CustomIO[TollBoothStats.Record]("entry-stats-hourly-table-id")) { results =>
        val endOfHourlyWindow = Instant.parse("2014-09-10T12:59:59.999Z")
        results should containElements(
          anyTollBoothStatsRecord.copy(created_at = endOfHourlyWindow)
        )
      }
      .output(CustomIO[TollBoothStats.Record]("entry-stats-daily-table-id")) { results =>
        val endOfDailyWindow = Instant.parse("2014-09-10T23:59:59.999Z")
        results should containElements(
          anyTollBoothStatsRecord.copy(created_at = endOfDailyWindow)
        )
      }
      .run()
  }
}

Run the test to verify data pipeline correctness:

1
2
3
4
5
6
7
8
9
sbt> testOnly *TollBoothEntryStatsJobTest
[info] TollBoothEntryStatsJobTest:
[info] Toll job
[info] - should run in the batch mode
[info] Run completed in 8 seconds, 704 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Beautiful 💖, it’s now the moment to embark on the first deployment with the Dataflow runner, but that’s a tale for the upcoming blog post.

Summary

I created batch data pipeline for calculating toll booth statistics using test driven development:

  • Failing Test First: Follow the TDD principle of writing a failing test before writing code.
  • Local Testing: Using frameworks that support local testing allows you to iterate quickly and efficiently during the development process without the need for a production-like environment.
  • Apache Beam Direct Runner: The choice of Apache Beam’s direct runner is excellent for TDD. It provides a realistic execution environment while minimizing the overhead, making it suitable for fast testing iterations.
  • Starting with End-to-End Test: Beginning with an end-to-end test is a smart move. It helps design the data pipeline’s API and covers the happy path scenario, ensuring that the primary use case is working correctly.
  • Unit Tests for Domain: Implementing unit tests to check domain invariants and core business logic is crucial for ensuring the correctness of your data pipeline. This approach helps you catch potential issues at a granular level.

By adhering to these principles and practices, you’ve established a structured and reliable methodology for developing your toll booth statistics data pipeline. This approach won’t only enhance the quality and robustness of your pipeline but also make it easier to maintain and extend in the future.

In this blog post, I didn’t explain code sample details. It’s your homework to study my playground repository for unified stream and batch processing, available at https://github.com/mkuthan/stream-processing/. There, you’ll find code examples, detailed implementations, and further insights into the toll booth related data pipeline discussed in this post. Feel free to explore and experiment with the code to enhance your understanding of TDD for data engineering.

Updated:

Comments