Skip to main content

Streaming | Flink engine

This quickstart describes how to install Nussknacker configured to use Flink engine in Streaming Processing mode using docker-compose and goes through sample scenarios.

If you want really quick look, you can run some parts of this quickstart with embedded engine with:

docker run -it --network host -e KAFKA_ADDRESS=localhost:3032 -e SCHEMA_REGISTRY_URL=http://localhost:3082 touk/nussknacker:latest

Note that some things (e.g. metrics) will not work, it uses Lite engine instead of Flink so stateful processing won't be available, and this mode is not intended for production use. Before running it, kafka should be exposed on localhost:3032 and schema registry on http://localhost:3082

Prerequisites

  • docker (20.10.14+) installed
  • open certain ports (see docker-compose.yml)
  • Linux or MacOS

Running

  • Clone Nussknacker-quickstart project from GitHub
  • Run ./docker/streaming/start.sh and wait until all components start

In case of containers restart please use ./docker/streaming/restart.sh in order to avoid Kafka startup issues.

Now you are ready to check your newly created environment

Preparing data schemas

In this quickstart we will use Avro schemas shown below. You can register them in Schema Registry manually, e.g. with AKHQ, or run the script ./docker/streaming/scripts/createSchemas.sh, which will do it for you.

{
"type": "record",
"name": "transaction",
"namespace": "pl.touk",
"fields": [
{ "name": "clientId", "type": "string" },
{ "name": "amount", "type": "int" },
{ "name": "isLast", "type": "boolean", "default": false }
]
}

Defining a new scenario

  • Go to Nussknacker
  • Click Create new scenario button - name it DetectLargeTransactions
  • You'll see an empty workspace
  • Click Import on right panel and upload docker/streaming/scenarios/DetectLargeTransactions.json

    This scenario reads transactions data from Kafka, filters only those with amount greater than some value and writes filtered events back to Kafka.

  • Double-click on nodes to see scenario logic
  • Click Save

You have just created your first scenario!

Test scenario with data

  • Click Deploy on the right panel
  • Verify on Flink UI that your scenario is running
  • Run ./docker/streaming/scripts/sendTestTransactions.sh script a few times to generate some data

The first run may end with error from Kafka - don't worry about it. Script will send some json data to "transactions" Kafka topic.

  • Go to Metrics tab on Nussknacker main panel - you should see changed metrics.

Your scenario just processed data from Kafka and saved filtered results!

Producing events manually

  • In more controlled way, you can use AKHQ installed in quickstart setup (it's not part of the default NU installation). Go to Data tab
  • Double-click on transactions topic name and then Produce to topic
  • In a form you can set desired parameters of the event e.g. event key and value
  • Please note that the events have to adhere to schema, otherwise you'll get runtime errors.

See results

Test your scenario in a sandbox

  • Clink generate button in right panel of application

If you followed the Quickstart from the beginning you should have some data on Kafka by now. Most recent records from Kafka will be downloaded to a file.

  • Click from file button and upload file generated in last step
  • After a while you will see test results - how many records passed filters, and what where variables values

Add more behaviour to the scenario

After creating and running basic scenario it's time to add more sophisticated behaviour - in this (optional) section you'll see how to use a bit more complex components. After each step you can deploy, test and see modified results.

Time window aggregations

First, we'll add time window aggregation - we are going to look only for customers who's total sum of transaction values during last 24 hours exceeds certain threshold.

Click Import on right panel and upload ./docker/streaming/scenarios/DetectLargeTransactionsWithAggregation.json or watch video below to see how to do it step by step. You can see how Nussknacker smart editor works - fields are detected automatically and code completion helps you to write expressions.

Aggregation node is a bit more complex, you can look at Scenario authoring to learn more.

Integration with external system

In next step we'll see how to enrich data using external system, exposing customer data via OpenAPI. We implemented sample service in python (see customerservice folder), to show you don't have use Java to integrate with Nussknacker.

You can look at nussknacker.conf file (look for components.openAPI setting) to see how easy it is to configure additional OpenAPI services.

Click Import on right panel and upload docker/streaming/scenarios/DetectLargeTransactionsWithAggregationAndEnricher.json

You can see how we use new Enricher getcustomer to retrieve additional customer data to be able to categorize suspicious situations based on customer category

On the video you can see how Nussknacker detect fields from external service and how you can see metrics for OpenAPI intergration.

Correcting errors

In turns out that integration added in last step does not handle some cases well - namely, transaction may come from unknown customer, in this case customer service returns empty message.

Watch video to see how to use tests to detect and correct such situations, also - how to detect unexpected problems with external service (our sample implementation throws error from time to time):

Clean up

When you done and you want to clean upo your environment you can use docker/streaming/clean.sh script. It will remove all docker-compose's containers, nets and volumes created during your experimenting with Flink engine.