Streaming | Lite engine
This quickstart describes how to install Nussknacker configured to use Lite engine in Streaming Processing mode using helm and Kubernetes and goes through sample scenarios.
If you want really quick look, you can run some parts of this quickstart with embedded engine with:
docker run -it --network host -e KAFKA_ADDRESS=localhost:3032 -e SCHEMA_REGISTRY_URL=http://localhost:3082 touk/nussknacker:latest
Note that some things (e.g. metrics) will not work, and this mode is not intended for production use. Before running it, kafka should be exposed on localhost:3032 and schema registry on http://localhost:3082
Prerequisites
- helm, kubectl (more or less latest version) installed
- access to Kubernetes cluster
- for local installation with k3d see below
- cluster should have ingress configured
- Linux or MacOS
Running on local K3d
If you don't have K8s available, we recommend installing k3d.
You can use helper script .k3d/create-k3d-cluster.sh
to bootstrap a K3d cluster and k3d/remove-k3d-cluster.sh
to clean up when you finish experimenting. If you want to want to do it manually, remember that instructions below
assume that the cluster was created with ingress port mapped to 8081 - see guide for the details.
Running
- Clone Nussknacker-quickstart project from GitHub.
- If you have configured TLD connected to your ingress (not applicable with e.g. k3d setup), set DOMAIN parameter in
k8s-helm/.env
file. - Install additional services, not included in the standard chart (AKHQ and sample customer service) with
./k8s-helm/additional/install-streaming.sh
. Wait until they're running. - Run
./k8s-helm/install-streaming.sh
and wait until all components start.
In all links below we assume using k3d setup, described above. If you use ingress with TLD configured, please replace http://localhost:8081
with http(s)://nu-quickstart-nussknacker.$DOMAIN/
in all links below.
Now you are ready to check your newly created environment:
- Nussknacker - user/password: admin/admin
- Grafana
- AKHQ
Preparing data schemas
In this quickstart we will use Avro schemas shown below. You can register them in Schema Registry manually, e.g. with AKHQ,
or run the script ./k8s-helm/scripts/createSchemas.sh
, which will do it for you.
- transactions-value
- processedEvents-value
- alerts-value
{
"type": "record",
"name": "transaction",
"namespace": "pl.touk",
"fields": [
{ "name": "clientId", "type": "string" },
{ "name": "amount", "type": "int" },
{ "name": "isLast", "type": "boolean", "default": false }
]
}
{
"type": "record",
"name": "transaction",
"namespace": "pl.touk",
"fields": [
{ "name": "clientId", "type": "string" },
{ "name": "amount", "type": "int" },
{ "name": "isLast", "type": "boolean", "default": false }
]
}
{
"type": "record",
"name": "transaction",
"namespace": "pl.touk",
"fields": [
{ "name": "message", "type": "string" }
]
}
Defining a new scenario
- Go to Nussknacker
- Click
Create new scenario
button - name itDetectLargeTransactions
- You'll see an empty workspace
- Click
Import
on right panel and uploadk8s-helm/scenarios/DetectLargeTransactionsLite.json
This scenario reads transactions data from Kafka, filters only those with amount greater than some value and writes filtered events back to Kafka.
- Double-click on nodes to see scenario logic
- Click
Save
You have just created your first scenario!
Test scenario with data
- Click
Deploy
on the right panel - Wait until deployment status shows running
- Run
./k8s-helm/scripts/sendTestTransactions.sh
script a few times to generate some data
The first run may end with error from Kafka - don't worry about it. Script will send some json data to "transactions" Kafka topic.
- Go to Metrics tab on Nussknacker main panel - you should see changed metrics.
Your scenario just processed data from Kafka and saved filtered results!
Producing events manually
- In more controlled way, you can use AKHQ installed in quickstart setup (it's not part of the default NU installation). Go to Data tab
- Double-click on
transactions
topic name and thenProduce to topic
- In a form you can set desired parameters of the event e.g. event key and value
- Please note that the events have to adhere to schema, otherwise you'll get runtime errors.
See results
- Go to AKHQ to see data in the processedEvents topic - you should see processed events.
Test your scenario in a sandbox
- Click
generate
button in right panel of application
If you followed the Quickstart from the beginning you should have some data on Kafka by now. Most recent records from Kafka will be downloaded to a file.
- Click
from file
button and upload file generated in last step - After a while you will see test results - how many records passed filters, and what where variables values
Add more behaviour to the scenario
After creating and running basic scenario it's time to add more sophisticated behaviour - in this (optional) section you'll see how to use a bit more complex components. After each step you can deploy, test and see modified results.
Integration with external system
In next step we'll see how to enrich data using external system,
exposing customer data via OpenAPI. We implemented sample service in python (see customerservice
folder), to show you don't have use Java to integrate with Nussknacker.
You can look at
k8s-helm/values.yaml
file (look forcomponents.openAPI
setting) to see how easy it is to configure additional OpenAPI services.
Click Import
on right panel and upload k8s-helm/scenarios/DetectLargeTransactionsWithEnricherLite.json
You can see how we use new Enricher getcustomer to retrieve additional customer data to be able to categorize suspicious situations based on customer category
On the video you can see how Nussknacker detect fields from external service and how you can see metrics for OpenAPI intergration.
Correcting errors
In turns out that integration added in last step does not handle some cases well - namely, transaction may come from unknown customer, in this case customer service returns empty message.
Watch video to see how to use tests to detect and correct such situations, also - how to detect unexpected problems with external service (our sample implementation throws error from time to time):
Clean up
When you done and you want to clean upo your environment you can use k8s-helm/uninstall-streaming.sh
script.
It will remove K8S deployments, pods and services created during your experimenting with Lite engine.
Moreover, if you used our script to bootstrap K3d cluster, you can use .k3d/remove-k3d-cluster.sh
script to remove
the cluster.