Skip to main content
Version: Next

Flink components

Sources, sinks and custom transformations are based on Flink API. In order to implement any of those you need to provide:

  • a Flink function or Flink DataStream transformation
  • a Nussknacker specification

Sources

Implementing a fully functional source (BasicFlinkSource) is more complicated than a sink since the following things has to be provided:

  • a Flink SourceFunction
  • Flink type information for serializing/deserializing emitted data (e.g. #input)
  • a timestamp watermark handler so that events are correctly processed downstream, for example to avoid (or force!) dropping late events by aggregates. Read more about notion of time and watermarks
  • (optionally) generating test data support (trait FlinkSourceTestSupport) to ease scenarios authoring
  • (optionally) custom context initializer to emit more variables than #input. For example built-in Kafka sources emit #inputMeta variable with Kafka record metadata like: partition, topic, offset, etc. The other example could be a file source that emits current line number as a new variable along with the content (as #input variable)

Nussknacker also provides a more generic FlinkSource for implementing sources. The difference is instead of implementing a Flink SourceFunction, arbitrary DataStream[Context] can be returned, however you have to remember to assign timestamps, watermarks and initialize the context.

When using Flink engine, all sources returned by SourceFactory have to implement FlinkSource (or its subtrait BasicFlinkSource).

Examples

Sources for various systems like RabbitMQ, JDBC, etc. do not necessarily have to be implemented from scratch. Flink comes with simple sources already predefined and connectors with third-party systems. All of them can be used to implement a Nussknacker source.

Sinks

Sinks are easier to implement than sources. Nussknacker provides a factory for sinks that take only one parameter. The only thing that has to be provided is a Flink SinkFunction.

Sinks with multiple parameters can be implemented using FlinkSink. The following things are required:

  • prepareValue - a method that turns DataStream[Context] into DataStream[ValueWithContext[Value]] containing a final, evaluated value for the sink
  • registerSink - a method that turns DataStream[ValueWithContext[Value]] into DataStreamSink. It's the place where a Flink SinkFunction should be registered

Similarly to sources, all sinks returned by SinkFactory have to implement FlinkSink (or its subtrait BasicFlinkSink).

Again, Flink provides basic sinks and connectors which can be used while implementing own Nussknacker sinks.

Examples:

Custom stream transformations

Custom transformation can arbitrarily change DataStream[Context], it is implemented with FlinkCustomStreamTransformation. Great examples of custom transformers are aggregates. See here how components like previousValue, delay and aggregates are implemented.

Common details

Access to metadata like node id or scenario name and various helpers is provided by FlinkCustomNodeContext.

Special care should be taken to handle:

  • lifecycle - preparing the operator (source, sink or functions registered by custom transformers), closing resources, handling failures and restoring the state. See Flink's operators lifecycle for more details here
  • exceptions, e.g. during deserialization, since any thrown and unhandled exception by the source, causes the Flink job to restart.

⚠️ Flink components should not extend Lifecycle - it won't be handled properly