Flink components
Sources, sinks and custom transformations are based on Flink API. In order to implement any of those you need to provide:
- a Flink
DataStreamSource
in case of sources and aDataStream
intoDataStreamSink
transformation in case of sinks - a Nussknacker specification
Sources
Standard implementation
The recommended way to implement a source is through StandardFlinkSource
interface. Your source only has to implement a sourceStream
method that provides DataStreamSource
based on StreamExecutionEnvironment.
This approach provides a standard transformation of the input value into a Nussknacker Context
and allows the implementation to customize these steps:
- Timestamp watermark handler so that events are correctly processed downstream, for example to avoid (or force!) dropping late events by aggregates. Read more about notion of time and watermarks
- Context initializer
to emit more variables than
#input
. For example built-in Kafka sources emit#inputMeta
variable with Kafka record metadata like: partition, topic, offset, etc. The other example could be a file source that emits current line number as a new variable along with the content (as#input
variable)
Generic implementation
Nussknacker also provides a more generic interface for implementing sources - FlinkSource.
Instead of providing a Flink DataStreamSource
, you can provide an arbitrary DataStream[Context]
directly. However, you
have to remember to assign timestamps, watermarks, and initialize the context.
Test support
To enable testing functionality in scenarios using your source implementation, your source needs to implement certain test-specific interfaces:
- Basic test support -
FlinkSourceTestSupport
- besides the more generalSourceTestSupport
, the implementation:- has to provide a Flink
TypeInformation
for serializing/deserializing data emitted from source (e.g.#input
) - optionally can provide a
TimestampWatermarkHandler
that will be used only for tests
- has to provide a Flink
- Test data generation -
TestDataGenerator
- Ad hoc test support -
TestWithParametersSupport
Read more about testing functionality in this section.
Specification
Your Nussknacker source component specification should be a SourceFactory returning your source implementation.
Examples
- Periodic source and its implementation
- FlinkKafkaSource and its factory returning the source implementation along with the fixed specification (e.g. based on a Scala case class) KafkaSourceFactory or generic one UniversalKafkaSourceFactory reading Kafka in different formats: Avro or Json with schemas defined in Schema Registry.
Sources for various systems like RabbitMQ, JDBC, etc. do not necessarily have to be implemented from scratch. Flink comes with simple sources already predefined and connectors with third-party systems. All of them can be used to implement a Nussknacker source.
Sinks
Implementation
Sinks are easier to implement than sources. Nussknacker provides a factory for sinks that take only one parameter. The only thing that has to be provided is a Flink SinkFunction.
Sinks with multiple parameters can be implemented using FlinkSink. The following things are required:
prepareValue
- a method that turnsDataStream[Context]
intoDataStream[ValueWithContext[Value]]
containing a final, evaluated value for the sinkregisterSink
- a method that turnsDataStream[ValueWithContext[Value]]
intoDataStreamSink
. It's the place where a FlinkSinkFunction
should be registered
Specification
Similarly to sources, all sinks returned by SinkFactory
have to implement FlinkSink
(or its subtrait BasicFlinkSink
).
Examples
- FlinkKafkaUniversalSink and its factory UniversalKafkaSinkFactory
Flink provides basic sinks and connectors which can be used while implementing own Nussknacker sinks.
Custom stream transformations
Custom transformation can arbitrarily change DataStream[Context]
, it is implemented with FlinkCustomStreamTransformation.
Great examples of custom transformers are aggregates. See here
how components like previousValue, delay
and aggregates are implemented.
Common details
Access to metadata like node id or scenario name and various helpers is provided by FlinkCustomNodeContext
.
Special care should be taken to handle:
- lifecycle - preparing the operator (source, sink or functions registered by custom transformers), closing resources, handling failures and restoring the state. See Flink's operators lifecycle for more details here
- exceptions, e.g. during deserialization, since any thrown and unhandled exception by the source, causes the Flink job to restart.
⚠️ Flink components should not extend Lifecycle - it won't be handled properly