- docker (more or less latest version) installed
- open certain ports (see docker-compose.yml)
- Clone Nussknacker-quickstart project from GitHub
./start.shand wait until all components start
In case of containers restart please use
docker-compose stopinstead of
docker-compose killin order to avoid Kafka startup issues.
Now you are ready to check your newly created environment
- Go to Nussknacker
- Click 'Create new scenario' button - name it 'DetectLargeTransactions'
- You'll see an empty workspace
- Run ./testData/schema/createSchemas.sh to create Schema Registry subjects - you can see them on AKHQ
- Click 'Import' on right panel and upload 'testData/DetectLargeTransactions.json'
This scenario reads transactions data from Kafka, filters only those with amount greater than some value and writes filtered events back to Kafka.
- Double-click on nodes to see scenario logic
- Click 'Save'
You have just created your first scenario!
- Click 'Deploy' on the right panel
- Verify on Flink UI that your scenario is running
- Run ./testData/sendTestTransactions.sh script a few times to generate some data
The first run may end with error from Kafka - don't worry about it. Script will send some json data to "transactions" Kafka topic.
- Go to Metrics tab on Nussknacker main panel - you should see changed metrics.
Your scenario just processed data from Kafka and saved filtered results!
- Go to AKHQ to see data in the processedEvents topic - you should see processed events.
- Clink 'generate' button in right panel of application
If you followed the Quickstart from the beginning you should have some data on Kafka by now. Most recent records from Kafka will be downloaded to a file.
- Click 'from file' button and upload file generated in last step
- After a while you will see test results - how many records passed filters, and what where variables values
After creating and running basic scenario it's time to add more sophisticated behaviour - in this (optional) section you'll see how to use a bit more complex components. After each step you can deploy, test and see modified results.
First, we'll add time window aggregation - we are going to look only for customers who's total sum of transaction values during last 24 hours exceeds certain threshold.
Click 'Import' on right panel and upload 'testData/DetectLargeTransactionsWithAggregation.json' or watch video below to see how to do it step by step. You can see how Nussknacker smart editor works - fields are detected automatically and code completion helps you to write expressions.
In next step we'll see how to enrich data using external system, exposing customer data via OpenAPI. We implemented sample service in python (see customerservice folder), to show you don't have use Java to integrate with Nussknacker.
You can look at
nussknacker.conffile (look for
components.openAPIsetting) to see how easy it is to configure additional OpenAPI services.
Click 'Import' on right panel and upload 'testData/DetectLargeTransactionsWithAggregationAndEnricher.json'
You can see how we use new Enricher getcustomer to retrieve additional customer data to be able to categorize suspicious situations based on customer category
On the video you can see how Nussknacker detect fields from external service and how you can see metrics for OpenAPI intergration.
In turns out that integration added in last step does not handle some cases well - namely, transaction may come from unknown customer, in this case customer service returns empty message.
Watch video to see how to use tests to detect and correct such situations, also - how to detect unexpected problems with external service (our sample implementation throws error from time to time):