Low-code solution for enrichments and time windows in marketing promotion
Some time ago a potential customer approached us, needing help with translating their business requirements to a streaming data algorithm. While the problem itself was far from complex, the folks were not sure how it fits into concepts of enrichments, window aggregations etc.
In this post, we’ll try to have a look at their case (anonymized a bit), and map the requirements to operations and respective nodes in Nussknacker.
We have a stream of customer events - let’s assume these are information about various payment transactions. Each event has an id of the transaction and of the customer, type (purchase, withdrawal, deposit, etc.) and amount. A typical event will look like this:
From time to time, the business wants to open a promotion - the first 50 users in a given hour who do a certain thing (in our case - make a purchase) will receive a bonus. Each customer can receive the bonus only if their current transaction exceeds the amount of their previous transaction by more than $10.
We can see three requirements we have to handle:
- Determining whether there is a promotion at the moment
- Selecting only the first 50 users each hour
- Check the condition on the amount of transaction compared to the previous one
In the next sections, we’ll look at how to approach each of those (and similar) problems with standard Nussknacker components. The end scenario diagram will look like this:
Is there an active promotion?
The first thing that we want to check is - which promotion is active. Most likely, we don’t have this information in the event itself. This is one of the examples of enrichment - getting additional information in the scenario. Another classical example is enriching events with data from the customer profile.
Generally speaking, there are two ways of doing this:
- Invoking external service - e.g. a database or REST API
- Left outer join with a stream containing versions of promotions (e.g. each status update will result in a ‘promotion’ event)
Both of them are supported by Nussknacker.
In Nussknacker, invoking an external database as an enricher is pretty simple (of course when it's configured to be used). Then you’ll get a lookup component which is ready to use:
Are there any drawbacks? Well, actually there are two. The first is performance - if you have a stream of 10k events/s then probably looking up data in a standard database is not a good idea.
In the case described in this blog, we can use the TTL setting in the enricher, to cache things a bit, but it’s not always feasible (e.g. you cannot afford to access stale data).
Of course, you can use Ignite / Redis (actually at one of Nussknacker deployments a single Redis instance serves ~300k read/s with latency below 1ms) - but it definitely needs care and tuning!
The other problem with enrichment is more conceptual. In streaming mode, it’s perfectly possible to process data a lot later than they appeared (after planned maintenance, failure, or a spike in traffic). If we just read the current data from the database, they may be out of sync with the data from the event. Is it a problem? Sometimes it can be - it may happen that during processing data from yesterday, it’s vital to use customer data as of yesterday - not the current state. To do this, we’d have to retain all versions of customer data - which is sometimes not feasible and can be more difficult to use (especially when it comes to fast-changing data).
Note: the rest of this paragraph describes a bit more complex concepts. If you don’t feel comfortable with streaming yet, feel free to skip it.
If enrichment with a DB/OpenAPI is unfeasible in your use case, you can use a bit more complex, yet powerful technique - enrichment using single-side-join. To do this, we’ll need an additional stream of data, containing current versions of enrichment data - in our case, this will be a stream of promotions. Each event contains the current version of the promotion (it’s important that the event contains all fields of a given promotion). Each event would look like this:
Of course, you’ll need to get such a stream first, but in modern data architectures, using CDC techniques, this shouldn’t be much of a problem.
So, let’s look at the enrichment with a join in Nussknacker. The start of the scenario diagram will look like this - for the Promotions node, we have two incoming branches - one with transactions, and one for promotion change events:
Let’s think about how to process those two streams. Events starting with T will denote transaction events while starting with P - events in the promotions change stream.
- T1, transaction_type = ‘PURCHASE’ - no promotions are known
- P1, transaction_type = ‘PURCHASE’ - we save P1 in the stream’s ‘internal store’ under ‘PURCHASE’ key
- T2, transaction_type = ‘PURCHASE’, there is P1 object in the store, so it’s returned
- T3, transaction_type = ‘DEPOSIT’, no object for ‘DEPOSIT’ key
- P2, transaction_type = ‘PURCHASE’, we replace P1 in the store with P2
- T4, transaction_type = ‘PURCHASE’, we return P2, as it’s the current version of promotion for ‘PURCHASE’ key
While the configuration of ‘Promotions’ node will be as follows:
What do all parameters mean? It goes like this:
- We have a ‘transactions’ branch - which contains our main events, that we want to enrich, and a ‘promotions’ branch - containing the enrichment data
- We want to join them on the ‘transaction_type’ field - present in both streams (we assume it’s the primary key for promotions)
- There are also two more advanced parameters (please see the documentation for the details):
- We define how we want to process the enrichment data - for each key we keep the last version (the ‘aggregator’ parameter), we keep the whole enrichment element (this is ‘#input’ in the ‘aggregateBy’ parameter)
- We assume that after 10 days the promotion is ‘outdated’ and can be removed (this is the ‘windowLength’ parameter)
Of course, there are a few caveats here - e.g. if the asynchronous processing of promotions change stream is delayed, it may happen that users won’t get their bonus, as enrichment is based on the state of enrichment in the moment of processing transaction event - so careful consideration and monitoring is required.
Regardless of the type of enrichment used, we can finally filter transactions occurring when promotion for a given transaction type was active:
To wrap up this part:
Enriching input data is one of the core features of Nussknacker. For less demanding scenarios (or in Request-Response mode) you can use Database or OpenAPI. For streaming cases when you have critical performance or correctness requirements - look at SingleSideJoin.
Picking only eligible transactions
Of course, active clients will generate quite a few transaction events. We want them to be eligible for any bonus, only if they increased their transaction value by more than $10.
For this, we’ll need a bit simpler stateful transformation - all we need to do is to get the amount value from the previous event of a given customer. Again, we need a grouping key - in our case, it will be the customer id and the value that we’ll store - the transaction amount.
Fortunately, Nussknacker provides us with a ready component for such an operation:
Next, we can easily filter out unwanted events:
What are other uses of the previousValue component? Actually, there are quite a few, for example in the case described here we want to determine how far is the location of the current transaction from the previous one.
Only the first 50, please!
Okay, so we know the promotion coded XYZ is currently active and we are sure the amount condition is satisfied for a given transaction. How can we make sure only the first 50 ones will get the bonus? Or to make it more precise - only the first 50 in a given hour.
We’ll use the concept of a tumbling-window - we will aggregate certain values for each event type (which determine the promotion and its code) over a window spanning 1 hour.
Let’s see what are the important pieces of the configuration of such a window:
- How do we group events (or in other words - what is the grouping key)? In our case, it will be a promotion code, as we want to count the number of events for a given promotion (remember that we enriched the transaction stream with promotion data).
- How and what do we want to aggregate? We want to compute the count of events, in other words - we sum a constant value of 1 for each event. In other cases, we could e.g. compute the maximum value of the amount, etc.
- Last but not least - how long is our window? There are some nuances in this question - e.g. when we have a window spanning one calendar day - how do we deal with the timezones (currently Flink and Nussknacker can handle only Zulu time)?
- There is one more nuance (feel free to skip it is not clear to you). When do we want to use result of the aggregation (in our terms - when do we emit it?) Here we want to do filtering based on this result, so we want to use partial aggregations for each event, but in other cases, we may want to use the result only after the window completes.
All those parameters can be easily configured with Nussknacker:
Sometimes you’ll want an answer to a slightly different question - how many things happened in the last hour before the event (it can be especially useful for fraud detection scenarios). In this case, you can use the sliding-window component - it’s configured similarly, check the documentation for the details.
There are a few more window types and a few knobs you can use to alter their behaviour, but the ones described in this section should be enough to get you started in many cases.
Translating the business case to streaming concepts is not always obvious - especially if one is used to thinking in “batch” SQL terms.
However, once you grasp a few general ideas, they become pretty natural to use.
Nussknacker has a comprehensive set of general-use components, which allow handling most of the cases pretty easily. If you had a look at our documentation but are still not sure how to map the requirements to streaming building blocks - contact us and we’ll help.