Flink components
Sources, sinks and custom transformations are based on Flink API. In order to implement any of those you need to provide:
- a Flink function or Flink
DataStream
transformation - a Nussknacker specification
Sources
Implementing a fully functional source (BasicFlinkSource) is more complicated than a sink since the following things has to be provided:
- a Flink SourceFunction
- Flink type information
for serializing/deserializing emitted data (e.g.
#input
) - a timestamp watermark handler so that events are correctly processed downstream, for example to avoid (or force!) dropping late events by aggregates. Read more about notion of time and watermarks
- (optionally) generating test data support (trait
FlinkSourceTestSupport
) to ease scenarios authoring - (optionally) custom context initializer
to emit more variables than
#input
. For example built-in Kafka sources emit#inputMeta
variable with Kafka record metadata like: partition, topic, offset, etc. The other example could be a file source that emits current line number as a new variable along with the content (as#input
variable)
Nussknacker also provides a more generic FlinkSource
for implementing sources. The difference is instead of implementing a Flink SourceFunction
, arbitrary DataStream[Context]
can be returned, however you have to remember to assign timestamps, watermarks and initialize the context.
When using Flink engine, all sources returned by SourceFactory
have to implement FlinkSource
(or its subtrait BasicFlinkSource
).
Examples
- Periodic source and its implementation
- FlinkKafkaSource and its factory returning the source implementation along with the fixed specification (e.g. based on a Scala case class) KafkaSourceFactory or generic one UniversalKafkaSourceFactory reading Kafka in different formats: Avro or Json with schemas defined in Schema Registry.
Sources for various systems like RabbitMQ, JDBC, etc. do not necessarily have to be implemented from scratch. Flink comes with simple sources already predefined and connectors with third-party systems. All of them can be used to implement a Nussknacker source.
Sinks
Sinks are easier to implement than sources. Nussknacker provides a factory for sinks that take only one parameter. The only thing that has to be provided is a Flink SinkFunction.
Sinks with multiple parameters can be implemented using FlinkSink. The following things are required:
prepareValue
- a method that turnsDataStream[Context]
intoDataStream[ValueWithContext[Value]]
containing a final, evaluated value for the sinkregisterSink
- a method that turnsDataStream[ValueWithContext[Value]]
intoDataStreamSink
. It's the place where a FlinkSinkFunction
should be registered
Similarly to sources, all sinks returned by SinkFactory
have to implement FlinkSink
(or its subtrait BasicFlinkSink
).
Again, Flink provides basic sinks and connectors which can be used while implementing own Nussknacker sinks.
Examples:
- FlinkKafkaUniversalSink and its factory UniversalKafkaSinkFactory
Custom stream transformations
Custom transformation can arbitrarily change DataStream[Context]
, it is implemented with FlinkCustomStreamTransformation.
Great examples of custom transformers are aggregates. See here
how components like previousValue, delay
and aggregates are implemented.
Common details
Access to metadata like node id or scenario name and various helpers is provided by FlinkCustomNodeContext
.
Special care should be taken to handle:
- lifecycle - preparing the operator (source, sink or functions registered by custom transformers), closing resources, handling failures and restoring the state. See Flink's operators lifecycle for more details here
- exceptions, e.g. during deserialization, since any thrown and unhandled exception by the source, causes the Flink job to restart.
⚠️ Flink components should not extend Lifecycle - it won't be handled properly