Skip to main content
Version: 1.11



Testing API should not be considered stable at the moment.

Additional components need to be tested before they can be used by the users. Nussknacker provides toolkit for building scenarios and executing them in tests. There are two main dependencies you can add in scope test

  • nussknacker-flink-components-testkit
  • nussknacker-lite-components-testkit

These modules will provide you with specific executor and test components building blocks as well as with Scalatest and utilities.

Testing Nussknacker Components

Creating test scenario

There is a ScenarioBuilder which gives developer DSL-like mechanism for building scenarios. At first, you choose type of the scenario from:

  • streaming
  • streamingLite
  • requestResponse

Next you specify scenario itself. Every scenario need at least one source followed by filters, enrichers etc. and end with either sink or processor end.

val scenario = 
.source("start", "source")
.enricher("customer", "customer", "getCustomer", ("customer_id", "#input"))
.processorEnd("end", "invocationCollector", "value" -> "#customer")

Creating test scenario runner

Scenario should be executed inside a runner. TestScenarioRunner gives you another DLS for building runners. At first, you chose type of the scenario from:

  • flinkBased - based on Flink engine, you need to pass to it FlinkMiniClusterHolder, it can be created e.g. using FlinkSpec:
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
val testScenarioRunner = TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
  • kafkaLiteBased - based on Lite engine, no other setup needed, provides suitable methods of simulation communication with kafka, it bases on mocked Schema Registry, Kafka server is not needed:
import pl.touk.nussknacker.engine.lite.util.test.LiteKafkaTestScenarioRunner._
val testScenarioRunner = TestScenarioRunner
  • liteBased - also based on Lite engine, provides interface to communicate with engine using raw classes (not using any Kafka API, similar interface as using flinkBased):
import pl.touk.nussknacker.engine.lite.util.test.LiteTestScenarioRunner._
val testScenarioRunner = TestScenarioRunner

Injecting custom and mocked components

You can inject list of your own or mocked components with .withExtraComponents method on specified above TestScenarioRunner to be passed to engine model data. Each component should match ComponentDefinition. It means that ComponentDefinition passed via .withExtraComponents overrides the one which could be already defined in modelData for given name. Example:

import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
val stubbedGetCustomerOpenApiService: SwaggerEnricher = new SwaggerEnricher(Some(new URL(rootUrl(port))), services.head, Map.empty, stubbedBackedProvider)
val mockComponents = List(ComponentDefinition("getCustomer", stubbedGetCustomerOpenApiService))
val testScenarioRunner = TestScenarioRunner
.flinkBased(resolvedConfig, flinkMiniCluster)

Running scenario with data

Scenario can be run with data via .runWithData method. This call synchronously executes scenario inside runner with data being passed to input source.

testScenarioRunner.runWithData(scenario, List(1, 3, 5))

Both flinkBased and liteBased scenario test runners provides additional source component which is used for providing test data in runWithData method. Results are collected using sink component in liteBased case and invocationCollector in flinkBased case. All component names can be accessed using TestScenarioRunner object e.g. using TestScenarioRunner.testDataSource property.

In case of kafkaLiteBased scenario test runner, you should use the same source/sink components as in production (e.g. kafka). There are available methods for passing Avro records or JSON objects - you don't need to serialize them. Example for Avro:

val runner = TestScenarioRunner.kafkaLiteBased().build()
val sourceSchemaId = runner.registerAvroSchema("sourceTopic", sourceSchema)
runner.registerAvroSchema("sinkTopic", sinkSchema)

val genericRecord = new GenericRecordBuilder(sourceSchema).set("field", "value").build()
val input = KafkaAvroConsumerRecord("sourceTopic", genericRecord, sourceSchemaId)
runner.runWithAvroData(scenario, List(input))

Retrieving results

Results of the scenario invocation can be get with results()

testScenarioRunner.results().size shouldBe 6
testScenarioRunner.results().toSet shouldBe Set(5, 10, 15, 20, 30, 40)

Auto provided test components

Test toolkit automatically gives you few test components you could see above.

  • source - it can be used to provide data for the scenario
  • invocationCollector - you can use it to verify scenario behaviour