Migration guide
To see the biggest differences please consult the changelog.
In version 1.8.0
Scenario authoring changes
- #3924
- Fixup:
{}
is now interpreted as "allow everything schema", not as "object schema". Objects schemas have to have declared"type": "object"
. - Unknown is now allowed on sinks in both validation modes if output schema is "everything allowed schema".
- Fixup:
Code API changes
- #3924 - changes to
SwaggerTyped
hierarchySwaggerMap(valuesType)
->SwaggerObject(Map.empty, additionalProperties = AdditionalPropertiesEnabled(valuesType))
AdditionalPropertiesSwaggerTyped
->AdditionalPropertiesEnabled
AdditionalPropertiesWithoutType
->AdditionalPropertiesEnabled(SwaggerAny)
SwaggerRecursiveSchema/SwaggerUnknownFallback
->SwaggerAny
Other changes
- #3835 Removed Signals and QueryableState. This change affects:
- Configuration
- Components and DeploymentManager API
- REST API
- #3823, #3836, #3843 -
scenarios with multiple sources can be tested from file
TestDataGenerator#generateTestData
returns JSON test records instead of raw bytes. Test records are serialized to a file by designer Test record can optionally contain timestamp which is used to sort records generated by many sourcesTestDataParser
was replaced withTestRecordParser
that turns a single JSON test record into a source recordTestData.newLineSeparated
helper was removed. Scenario test records have to be created explicitly. Each scenario test record has assigned sourceDeploymentManager#test
takesScenarioTestData
instead ofTestData
- Designer configuration
testDataSettings.testDataMaxBytes
renamed totestDataMaxLength
- #3916 Designer configuration
environmentAlert.cssClass
renamed toenvironmentAlert.color
- #3922 Bumps: jwks: 0.19.0 -> 0.21.3, jackson: 2.11.3 -> 2.13.4
- #3929 From now,
SchemaId
value class is used in every place where schema id was represented as an Int. For conversion betweenSchemaId
andInt
useSchemaId.fromInt
andSchemaId.asInt
. UseConfluentUtils.toSchemaWithMetadata
instead ofSchemaWithMetadata.apply
for conversion between Confluent'sSchemaMetadata
and oursSchemaWithMetadata
. - #3948 Now, we are less dependent from Confluent schema registry.
To make it possible, some kafka universal/avro components refactors were done. Most important changes in public API:
- ConfluentSchemaBasedSerdeProvider.universal was replaced by UniversalSchemaBasedSerdeProvider.create
- Non-confluent classes renamed and moved to desired packages
- Extracted new class: SchemaIdFromMessageExtractor to make Confluent logic explicit and moved to top level
- Extracted SchemaValidator to make Confluent logic explicit and be able to compose
- Some renames: ConsumerRecordUtils -> KafkaRecordUtils
- RecordDeserializer -> AvroRecordDeserializer (also inheritance replaced by composition)
- (De)SerializerFactory - easier abstractions
- ConfluentSchemaRegistryFactory is not necessary now - removed
In version 1.7.0
Scenario authoring changes
- #3701 Right now access in SpEL to not existing field on TypedMap won't throw exception, just will return
null
- #3727 Improvements: Change RR Sink validation way:
- Added param
Value validation mode
at RR response component - We no longer support
nullable
param from Everit schema. Nullable schema are supported by union with null e.g. `["null", "string"]
- Added param
Configuration changes
- #3768
request-response-embedded
andstreaming-lite-embedded
DeploymentManager types where replaced by onelite-embedded
DeploymentManager type with two modes:streaming
andrequest-response
like it is done inlite-k8s
case
Code API changes
- #3560, #3595
Remove dependency on
flink-scala
. In particular:- Switched from using
scala.DataStream
todatastream.DataStream
. Some tools exclusive to scala datastreams are available inengine.flink.api.datastream
- Scala based
TypeInformation
derivation is no longer used, for remaining casesflink-scala-utils
module is provided (probably will be removed in the future)
- Switched from using
- #3680
SubprocessRef::outputVariableNames
type is changed fromOption[Map[String,String]]
with default None, toMap[String,String]
with defaultMap.empty
- #3692 Rename
mockedResult
toexternalInvocation
in test results collectors. - #3606 Removed nussknacker-request-response-app. As a replacement you can use:
- nussknacker-request-response-app in version <= 1.6
- Lite K8s engine with request-response processing mode
lite-embedded
Deployment Manager with request-response processing mode
- #3610 Removed deprecated code. For details see changes in pull request.
- #3607 Request-response jsonSchema based encoder:
- ValidationMode moved to package
pl.touk.nussknacker.engine.api.validation
innussknacker-components-api
- BestEffortJsonSchemaEncoder moved to package
pl.touk.nussknacker.engine.json.encode
innussknacker-json-utils
- ValidationMode moved to package
- #3738 Kafka client libraries upgraded to 3.2.3. If using older Flink version, make sure to use 2.8.x client libraries. For Flink versions 1.15.0-1.15.2 include also fixed KafkaMetricWrapper
- #3668 Method
runWithRequests
ofRequestResponseTestScenarioRunner
(returned byTestScenarioRunner.requestResponseBased()
) now returnsValidatedNel
with scenario compilation errors instead of throwing exception in that case
REST API changes
- #3576
/processes
endpoint without query parameters returns all scenarios - the previous behaviour was to return only unarchived ones. To fetch only unarchived scenariosisArchived=false
query parameter has to be passed.
Other changes
- #3824 Due to data serialization fix, Flink scenarios using Kafka sources with schemas may be incompatible and may need to be restarted with clean state.
In version 1.6.0
- #3440 Feature: allow to define fragment's outputs
- Right now using fragments in scenario is changed. We have to provide each outputName for outputs defined in fragment.
Scenario authoring changes
- #3370 Feature: scenario node category verification on validation From now import scenario with nodes from other categories than scenario category will be not allowed.
- #3436 Division by zero will cause validation error. Tests that rely on
1/0
to generate exceptions should have it changed to code like1/{0, 1}[0]
- #3473 JsonRequestResponseSinkFactory provides also 'raw editor', to turn on 'raw editor' add
SinkRawEditorParamName -> "true"
- #3608 Use
ZonedDateTime
fordate-time
JsonSchema format,OffsetTime
fortime
format.
Code API changes
- #3406 Migration from Scalatest 3.0.8 to Scalatest 3.2.10 - if necessary, see the Scalatest migration guides, https://www.scalatest.org/release_notes/3.1.0 and https://www.scalatest.org/release_notes/3.2.0
- #3431 Renamed
helper-utils
todefault-helpers
, separatedMathUtils
fromcomponents-utils
tomath-utils
, removed dependencies fromhelper-utils
- #3420
DeploymentManagerProvider.typeSpecificInitialData
takes deploymentConfigConfig
now - #3493, #3582 Added methods
DeploymentManagerProvider.additionalPropertiesConfig
,DeploymentManagerProvider.additionalValidators
- #3506 Changed
LocalDateTime
toInstant
inOnDeployActionSuccess
inlistener-api
- #3513 Replace
EspProcess
withCanonicalProcess
in all parts of the API except for the compiler. - #3545
TestScenarioRunner.flinkBased
should be used instead ofNuTestScenarioRunner.flinkBased
. Before this, you need toimport pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
- #3386 Changed
CustomProcessValidator
validate
method. It now receivesCanonicalProcess
instead ofDisplayableProcess
and returnsValidatedNel[ProcessCompilationError, Unit]
instead ofValidationResult
. MovedCustomProcessValidator
from modulenussknacker-restmodel
in packagevalidation
tonussknacker-extensions-api
. - #3586 Module
nussknacker-ui
was renamed tonussknacker-designer
,ui.conf
was renamed todesigner.conf
,defaultUiConfing.conf
renamed todefaultDesignerConfig.conf
REST API changes
- #3506 Dates returned by REST API (createdAt, modifiedAt, createDate) are now returned in Zulu time, with timezone indication. This affects e.g.
/api/procecesses
,/api/processes/{scenarioId}
,/api/processes/{scenarioId}/activity
- #3542 Node additional info path renamed from
nodes/{scenarioId}/additionalData
tonodes/{scenarioId}/additionalInfo
Scenario API changes
- #3471, #3553
RequestResponseMetaData(path)
is changed toRequestResponseMetaData(slug)
.V1_033__RequestResponseUrlToSlug
migration is ready for that, the change also applies to Scenario DSL. - #3513 Scenario DSL returns
CanonicalProcess
instead ofEspProcess
. - #3630
SubprocessOutput
changed toSubprocessUsageOutput
, changes inOutputVar
definition
Configuration changes
- #3425 Deployment Manager for
request-response-embedded
configuration parameters changed:interface
->http.interface
port
->http.port
definitionMetadata
->request-response.definitionMetadata
- #3502 Refactor of
KafkaProperties
:kafkaAddress
property has been deprecated. Please providekafkaProperties."bootstrap.servers"
instead
Other changes
- #3441 Updated Flink 1.14.5 -> 1.15.2. Some Flink artefacts no longer have Scala version. Test using Flink may need to disable checkpointing or reduce time between checkpoints to prevent timeouts or long waits for tasks to finish.
In version 1.5.0
Configuration changes
- #2992 deploySettings changed to deploymentCommentSettings, now when specified require you to also specify field validationPattern, specifying exampleComment is optional.
- commentSettings fields modified. matchExpression changed to substitutionPattern, link changed to substitutionLink.
- #3165 Config is not exposed over http (GET /api/app/config/) by default. To enable it set configuration
enableConfigEndpoint
totrue
. - #3223 OAuth2 configuration
defaultTokenExpirationTime
changed todefaultTokenExpirationDuration
- #3263 Batch periodic scenarios carry processing type to distinguish scenarios with different categories.
For existing scenarios processing type is migrated to
default
. SetdeploymentManager.processingType
todefault
or update periodic scenarios table with actual processing type value - ideally it should be same value as the periodic engine key inscenarioTypes
.
Code API changes
- #2992 OnDeployActionSuccess in ProcessChangeEvent now requires instance of Option[Comment] instead of Option[String] as parameter with deploymentComment information. Added abstract class Comment in listener-api.
- #3136 Improvements: Lite Kafka testkit
ConfluentUtils.serializeRecordToBytesArray
replaced byConfluentUtils.serializeDataToBytesArray
ConfluentUtils.deserializeSchemaIdAndRecord
replaced byConfluentUtils.deserializeSchemaIdAndData
- #3178 Improvements: more complex test scenario runner result:
- Right now each method from
TestScenarioRunner
should returnValidatedNel[ProcessCompilationError, RunResult[R]]
where:- Invalid is representation of process compilation errors
- Valid is representation of positive and negative scenario running result
- Right now each method from
- #3255
TestReporter
util class is safer to use in parallel tests, methods require passing scenario name - #3265 #3288 #3297 #3299#3309
#3316 #3322 #3328 #3330 Changes related with UniversalKafkaSource/Sink:
RuntimeSchemaData
is generic - parametrized byParsedSchema
(AvroSchema and JsonSchema is supported).NkSerializableAvroSchema
renamed toNkSerializableParsedSchema
SchemaWithMetadata
wrapsParsedSchema
instead of AvroSchema
.SchemaRegistryProvider
refactoring:- rename
SchemaRegistryProvider
toSchemaBasedSerdeProvider
- decouple
SchemaRegistryClientFactory
fromSchemaBasedSerdeProvider
- rename
KafkaAvroKeyValueDeserializationSchemaFactory
renamed toKafkaSchemaBasedKeyValueDeserializationSchemaFactory
KafkaAvroValueSerializationSchemaFactory
renamed toKafkaSchemaBasedValueSerializationSchemaFactory
KafkaAvroKeyValueSerializationSchemaFactory
renamed toKafkaSchemaBasedKeyValueSerializationSchemaFactory
- #3253
DeploymentManager
has separatevalidate
method, which should perform initial scenario validation and return reasonably quickly (while deploy can e.g. make Flink savepoint etc.) - #3313 Generic types handling changes:
Typed.typedClass(Class[_], List[TypingResult])
is not available anymore. You should use more explicitTyped.genericTypeClass
instead- We check count of generic parameters in
Typed.genericTypeClass
- wrong number will cause throwing exception now - We populate generic parameters by correct number of
Unknown
in non-generic aware versions ofTyped
factory methods likeTyped.apply
orTyped.typedClass
- #3071 More strict Avro schema validation:
ValidationMode.allowOptional
was removed, instead of it please useValidationMode.lax
ValidationMode.allowRedundantAndOptional
was removed, instead of it please useValidationMode.lax
- Changes of
ValidationMode
, fields:acceptUnfilledOptional
andacceptRedundant
were removed
- #3376
FlinkKafkaSource.flinkSourceFunction
,FlinkKafkaSource.createFlinkSource
andDelayedFlinkKafkaConsumer.apply
takes additional argument,FlinkCustomNodeContext
now - #3272
KafkaZookeeperServer
renamed toEmbeddedKafkaServer
,zooKeeperServer
field changed type toOption
and is hidden now. - #3365 Numerous renames:
- module
nussknacker-avro-components-utils
->nussknacker-schemed-kafka-components-utils
- module
nussknacker-flink-avro-components-utils
->nussknacker-flink-schemed-kafka-components-utils
- package
pl.touk.nussknacker.engine.avro
->pl.touk.nussknacker.engine.schemedkafka
- object
KafkaAvroBaseComponentTransformer
->KafkaUniversalComponentTransformer
- module
- #3412 More strict filtering method types. Methods with parameters or result like
Collection[IllegalType]
are no longer available in SpEl. - #3542 Numerous renames:
- trait
NodeAdditionalInfo
->AdditionalInfo
, - class
MarkdownNodeAdditionalInfo
->MarkdownAdditionalInfo
- trait
NodeAdditionalInfoProvider
->AdditionalInfoProvider
- the SPI provider's configuration files must be renamed frompl.touk.nussknacker.engine.additionalInfo.NodeAdditionalInfoProvider
topl.touk.nussknacker.engine.additionalInfo.AdditionalInfoProvider
- method
AdditionalInfoProvider.additionalInfo
renamed tonodeAdditionalInfo
and new method addedpropertiesAdditionalInfo
- trait
REST API changes
- #3169 API endpoint
/api/app/healthCheck
returning short JSON answer with "OK" status is now not secured - before change it required to be an authenticated user with "read" permission.
Scenario authoring changes
- #3187 #3224 Choice component replaces Switch component. "Default" choice edge type, exprVal and expression are now deprecated. For existing usages, you don't need to change anything. For new usages, if you want extract value e.g. to simplify choice conditions, you need to define new local variable before choice using variable component. "Default" choice edge type can be replaced by adding "true" condition at the end of list of conditions
Breaking changes
- #3328 Due to addition of support for different schema type (AvroSchema and JsonSchema for now) serialization format of
NkSerializableParsedSchema
has changed. Flink state compatibility of scenarios which use Avro sources or sinks has been broken. - #3365 Due to renames (see section
Code API changes
) Flink state compatibility of scenarios which use Avro sources or sinks has been broken.
Other changes
- #3249#3250 Some kafka related libraries were bumped: Confluent 5.5->7.2, avro 1.9->1.11, kafka 2.4 -> 3.2.
It may have influence on your custom components if you depend on
kafka-components-utils
oravro-components-utils
module - #3376 Behavior of Flink's Kafka deserialization errors handling was changed - now instead of job failure, invalid message is omitted and configured
exceptionHandler
mechanism is used.
In version 1.4.0
Configuration changes
security.rolesClaim
changed tosecurity.rolesClaims
, type changed to list of stringskafka.schemaRegistryCacheConfig
configuration entry was added - it was hardcoded before. Default value ofkafka.schemaRegistryCacheConfig.availableSchemasExpirationTime
was changed from 1 minute to 10 seconds which will cause more often schema cache invalidation- #3031 Attachments are now stored in database (see more in section
Other changes
).attachmentsPath
was removed. Optional configattachments.maxSizeInBytes
was introduced with default value of 10mb
Code API changes
- #2983 Extract Permission to extensions-api
- Moved
pl.touk.nussknacker.ui.security.api.Permission
(security module) topl.touk.nussknacker.security.Permission
(extension-api module)
- Moved
- #3040 Deprecated
pl.touk.nussknacker.engine.api.ProcessListener.sinkInvoked
method. Switch to more generalendEncountered
method. - #3076 new implicit parameter
componentUseCase: ComponentUseCase
was added toinvoke()
method of all services extendingEagerServiceWithStaticParameters
Other changes
- #3031 Attachments are now stored in database. As this feature was rarely used, automatic migration of attachments from disk to db is not provided. To stay consistent db table
process_attachments
had to be truncated.
Breaking changes
- #3029
KafkaConfig
has new fieldschemaRegistryCacheConfig: SchemaRegistryCacheConfig
. Flink state compatibility has been broken. - #3116 Refactor
SchemaRegistryClientFactory
so it takes dedicated config object instead of KafkaConfig. This change minimizes chance of future Flink state compatibility break.SchemaIdBasedAvroGenericRecordSerializer
is serialized in Flink state, so we provide it now with as little dependencies as necessary. Flink state compatibility has been broken again. - #3363 Kafka consumer no longer set
auto.offset.reset
toearliest
by default. For default configuration files, you can useKAFKA_AUTO_OFFSET_RESET
env variable to easily change this setting.
In version 1.3.0
Code API changes
- #2741 #2841 Remove custom scenario provides some changes on API:
- Replace ProcessDeploymentData by CanonicalProcess (as VO)
- Replace scenario jsonString by CanonicalProcess at DeploymentManager, ProcessConfigEnricherInputData
- #2773 Using VersionId / ProcessId / ProcessName instead of Long or String:
PullProcessRepository
API was changed, right now we use VersionId instead of Long
- #2830
RunMode
is renamed toComponanteUseCase
andNormal
value is split into: EngineRuntime, Validation, ServiceQuery, TestDataGeneration.RunMode.Test
becomesComponanteUseCase.TestRuntime
- #2825, #2868 #2912 API modules changes:
- Extracted new modules:
nussknacker-scenario-api
with all scenario API parts fromapi
andinterpreter
nussknacker-components-api
(andnussknacker-lite-components-api
,nussknacker-flink-components-api
etc.), which contain API for creating componentsnussknacker-common-api
- base value classes shared betweenscenario-api
andcomponents-api
likeNodeId
,Metadata
etc.nussknacker-extensions-api
- API of extensions other than components
- Because of that, some changes in code were also introduced:
NodeId
moved frompl.touk.nussknacker.engine.api.context.ProcessCompilationError
topl.touk.nussknacker.engine.api
NodeExpressionId
,DefaultExpressionId
andbranchParameterExpressionId
moved frompl.touk.nussknacker.engine.api.context.ProcessCompilationError
topl.touk.nussknacker.engine.graph.expression
JobData
no longer containsDeploymentData
, which is not accessible for components anymoreDisplayJson
,WithJobData
,MultiMap
moved toutils
- Some methods from API classes (e.g.
Parameter.validate
) and classes (InterpretationResult
) moved to interpreter DeploymentManagerProvider.createDeploymentManager
takes nowBaseModelData
as an argument instead ofModelData
. If you want to use this data to invoke scenario, you should cast it to invokable representation via:import ModelData._; modelData.asInvokableModelData
- Extracted new modules:
- #2878 2898 #2924 Cleaning up of
-utils
modules- Extracted internal classes, not intended to be used in extensions to nussknacker-internal-utils module
- Extracted component classes, not used directly by runtime/designer to nussknacker-components-utils module
- Extracted kafka component classes, not used directly by lite-kafka-runtime/kafka-test-utils to nussknacker-kafka-components-utils
- Moved some classes that are in fact part of API to -api modules (e.g.
ToJsonEncoder
) - Module renames:
- nussknacker-avro-util to nussknacker-avro-components-utils
- nussknacker-flink-avro-util to nussknacker-flink-avro-components-utils
- nussknacker-flink-kafka-util to nussknacker-flink-kafka-components-utils
- nussknacker-flink-util to nussknacker-flink-components-utils
- nussknacker-request-response-util to nussknacker-request-response-components-utils
- nussknacker-model-util to nussknacker-helpers-utils
- Minor changes in code:
- Use
val docsConfig = new DocsConfig(config); import docsConfig._
instead ofimplicit val docsConfig = (...); import DocsConfig._
- Some components specific methods are not available from
KafkaUtils
. Instead, they are available fromKafkaComponentsUtils
ToJsonEncoder.encoder
takesAny => Json
function instead ofBestEffortJsonEncoder
as a parameter
- Use
- #2907 Hide some details of metrics to
utils-internal
(InstantRateMeter
,InstantRateMeterWithCount
), use method added toMetricsProviderForScenario
- #2916 Changes in
ProcessState
API.- Six similar methods creating
ProcessState
based onStateStatus
and some other details merged to one.- Methods removed:
- Two variants of
ProcessState.apply
takingProcessStateDefinitionManager
as a parameter SimpleProcessState.apply
- Two variants of
ProcessStatus.simple
ProcessStatus.createState
takingProcessStateDefinitionManager
as a parameter
- Two variants of
- Method added:
ProcessStateDefinitionManager.processState
with some default parameter values
- Methods removed:
ProcessStatus
class is removed at all. All methods returningProcessState
by it moved toSimpleProcessStateDefinitionManager
and removedpreviousState: Option[ProcessState]
from it. If you want to keep previous state's deployment details and only change "status details" just useprocessState.withStatusDetails
methodProcessState
,CustomAction
and its dependencies moved fromnussknacker-deployment-manager-api
tonussknacker-scenario-deployment-api
,restmodel
module not depend ondeployment-manager-api
anymore- #2969 Action
ProcessActionType.Deploy
is now available by default for scenarios inSimpleStateStatus.DuringDeploy
state. Mind this if you depend onOverridingProcessStateDefinitionManager
orSimpleProcessStateDefinitionManager
, and specifically on theirsstatusActions
method. As an exception, implementation for FlinkFlinkProcessStateDefinitionManager
stays the same as before (onlyProcessActionType.Cancel
is possible in this state), but this may be unified in the future.
- Six similar methods creating
Other changes
- #2886 This change can break previous flink snapshot compatibility. Restoring state from previous snapshot asserts that restored serializer UID matches current serializer UID. This change ensures that in further release deployments UIDs persisted within snapshots are not re-generated in runtime.
- #2950 Remove
MATH
helper, useNUMERIC
methods (they work better with some number types conversions)
In version 1.2.0
Configuration changes
- #2483
COUNTS_URL
environment variable is notINFLUXDB_URL
, withoutquery
path part. - #2493 kafka configuration should be moved to components provider configuration - look at
components.kafka
in dev-application.conf for example - #2624 Default name for
process
tag is nowscenario
. This affects metrics and count functionalities. Please update you Flink/Telegraf setup accordingly (see nussknacker-quickstart for details). If you still want to useprocess
tag (e.g. you have a lot of dashboards), please setcountsSettings.metricsConfig.scenarioTag
setting toprocess
Also, dashboard links format changed, see documentation for the details. - #2645 Default models:
genericModel.jar
,liteModel.jar
. were merged todefaultModel.jar
,managementSample.jar
was renamed todevModel.jar
. If you usedefaultModel.jar
it's important to includeflinkExecutor.jar
explicitly on model classpath.
Scenario authoring changes
- #2564 Flink union now takes only 'Output expression' parameters for branches (previously 'value' parameter), output variable must be of the same type, if you want to distinguish source branch in output variable please use map variable, example in Basic Nodes docs.
Other changes
- #2554 Maven artifact
nussknacker-kafka-flink-util
becomenussknacker-flink-kafka-util
andnussknacker-avro-flink-util
becomenussknacker-flink-avro-util
. General naming convention isnussknacker-$runtimeType-$moduleName
. Components inside distribution changed layout tocomponents(/$runtimeType)/componentName.jar
e.g.components/flink/kafka.jar
orcomponents/openapi.jar
KafkaSource
becomeFlinkKafkaSource
,ConsumerRecordBasedKafkaSource
becomeFlinkConsumerRecordBasedKafkaSource
,KafkaSink
becomeFlinkKafkaSink
,KafkaAvroSink
becomeFlinkKafkaAvroSink
- #2535, #2625, #2645 Rename
standalone
torequest-response
:- Renamed modules and artifacts
StandaloneMetaData
is nowRequestResponseMetaData
- Move
request-response
modules tobase
dir. standalone
in package names changed torequestresponse
Standalone
in class/variable names changed toRequestResponse
DeploymentManager/Service
uses dedicated format of status DTO, instead of the ones fromdeployment-manager-api
- Removed old, deprecated
jarPath
settings, in favour ofclassPath
used in other places - Extracted
nussknacker-lite-request-response-components
module
- #2582
KafkaUtils.toProducerProperties
setup only basic properties now (bootstrap.servers
and serializers) - before the change it was setting options which are not always good choice (for transactional producers wasn't) - #2600
ScenarioInterpreter
,ScenarioInterpreterWithLifecycle
now takes additional generic parameter:Input
.ScenarioInterpreter.invoke
takesScenarioInputBatch
which now contains list ofSourceId -> Input
instead ofSourceId -> Context
. Logic ofContext
preparation should be done inLiteSource
instead of beforeScenarioInterpreter.invoke
. invocation It means thatLiteSource
also takes this parameter and have a new methodcreateTransformation
. - #2635
ContextInitializer.initContext
now takesContextIdGenerator
instead ofnodeId
and returns just a function with strategy of context initialization instead of serializable function withLifecycle
. To use it with Flink engine, useFlinkContextInitializingFunction
wrapper. - #2649
DeploymentManagerProvider
takes newProcessingTypeDeploymentService
class as an implicit parameter - #2564 'UnionParametersMigration' available to migrate parameter name from 'value' to 'Output expression' - please turn it on you are using 'union' like component
- #2645 Simplify structure of available models (implementations of
ProcessConfigCreator
).defaultModel.jar
and components should be used instead of custom implementations ofProcessConfigCreator
, the only exception is when one wants to customizeExpressionConfig
. Also,nussknacker-flink-engine
module becamenussknacker-flink-executor
. - #2651
ValidationContext.clearVariables
now clears also parent reference. Important when invoked inside fragments. - #2673
KafkaZookeeperUtils
renamed toKafkaTestUtils
, it doesn't depend on ZooKeeper anymore. - #2686
ServiceWithStaticParameters
renamed toEagerServiceWithStaticParameters
. - #2695
nodeId
replaced withNodeComponentInfo
inNuExceptionInfo
. Simple wrapper class which holds the samenodeId
and alsocomponentInfo
. Migration is straightforward, just putnodeId
into the new case class:NuExceptionInfo(None, exception, context)
=> stays the sameNuExceptionInfo(Some(nodeId), exception, context)
=>NuExceptionInfo(Some(NodeComponentInfo(nodeId, None)), exception, context)
- if an exception is thrown inside the component, additional information can be provided:
- for base component (like
filter
orsplit
):NodeComponentInfo.forBaseNode("nodeId", ComponentType.Filter)
- for other:
NodeComponentInfo("nodeId", ComponentInfo("kafka-avro-source", ComponentType.Source))
- for base component (like
- if an exception is thrown inside the component, additional information can be provided:
- The same migration has to be applied to
ExceptionHandler.handling()
method.
- #2824 'ProcessSplitterMigration' available to migrate node name from 'split' to 'for-each' (see #2781)- please turn it on if you are using 'split' component
In version 1.1.0
Summary:
- A lot of internal refactoring was made to separate code/API specific for Flink.
If your deployment has custom components pay special attention to:
Lifecycle
management- Kafka components
- Differences in artifacts and packages
- Some of the core dependencies: cats, cats-effect and circe were upgraded. It affects mainly code, but it may also have impact on state compatibility and performance.
- Default Flink version was bumped do 1.14 - see https://github.com/TouK/nussknacker-flink-compatibility on how to run Nu on older Flink versions.
- Execution of SpEL expressions is now checked more strictly, due to security considerations. These checks can be overridden with custom
ExpressionConfig
.
- Apart from that:
- minor configuration naming changes
- removal of a few of minor, not documented features (e.g. SQL Variable)
- #2208 Upgrade, cats, cats-effects, circe. An important nuisance: we didn't upgrade sttp, so we cannot depend on
"com.softwaremill.sttp.client" %% "circe"
. Instead, the code is copied. Make sure you don't include sttp-circe integration as transitive dependency, but use class from http-utils instead. - #2176
EnrichDeploymentWithJarDataFactory
was replaced withProcessConfigEnricher
. - #2278 SQL Variable is removed
- #2280 Added optional
defaultValue
field toParameter
. InGenericNodeTransformation
can be set toNone
- values will be determined automatically. - #2289 Savepoint path in
/api/adminProcessManagement/deploy
endpoint is passed as asavepointPath
parameter instead of path segment. - #2293 Enhancement: change
nodeCategoryMapping
configuration tocomponentsGroupMapping
- #2301 #2620
GenericNodeTransformation.initialParameters
was removed - nowGenericNodeTransformation.contextTransformation
is used instead. To make Admin tab -> Invoke service form working, useWithLegacyStaticParameters
trait - #2409
JsonValidator
is now not determined by default based onJsonParameterEditor
but must be explicitly defined by@JsonValidator
annotation - #2304 Upgrade to Flink 1.14. Pay attention to Flink dependencies - in some (e.g. runtime) there is no longer scala version.
- #2295
FlinkLazyParameterFunctionHelper
allows (and sometimes requires) correct exception handling - #2307 Changed
nussknacker-kafka
module name tonussknacker-kafka-util
- #2310 Changed
nussknacker-process
module name tonussknacker-flink-engine
- #2300 #2343 Enhancement: refactor and improvements at components group:
- Provided
ComponentGroupName
as VO SingleNodeConfig
was renamed toSingleComponentConfig
and moved frompl.touk.nussknacker.engine.api.process
package topl.touk.nussknacker.engine.api.component
- Configuration
category
in node configuration was replaced bycomponentGroup
- Configuration
nodes
in model configuration was replaced bycomponentsUiConfig
- Additional refactor:
ProcessToolbarService
moved frompl.touk.nussknacker.ui.service
package topl.touk.nussknacker.ui.process
- Additional refactor:
ProcessToolbarService
moved frompl.touk.nussknacker.ui.service
package topl.touk.nussknacker.ui.process
DefinitionPreparer
was renamed toComponentDefinitionPreparer
NodesConfigCombiner
was removed- REST API /api/processDefinitionData/* response JSON was changed:
nodesToAdd
was renamed tocomponentGroups
posibleNode
was renamed tocomponents
nodesConfig
was renamed tocomponentsConfig
- config
icon
property fromcomponentsConfig
right now should be relative tohttp.publicPath
e.g./assets/components/Filter.svg
(before was justFilter.svg
) or url (withhttp
/https
)
- Provided
- #2346 Remove
endResult
fromSink
in graph.Sink
no longer definestestOutput
method - they should be handled by respective implementations- Change in definition of
StandaloneSink
previouslyStandaloneSinkWithParameters
, as output always has to be computed with sink parameters now - Changes in definition of
FlinkSink
, to better handle capturing test data - Removal of
.sink
method inGraphBuilder
- use.emptySink
if suitable
- #2331
KafkaAvroBaseTransformer
companion object renamed toKafkaAvroBaseComponentTransformer
KryoGenericRecordSchemaIdSerializationSupport
renamed toGenericRecordSchemaIdSerializationSupport
- #2305 Enhancement: change
processingTypeToDashboard
configuration toscenarioTypeToDashboard
- #2296 Scenarios & Fragments have separate TypeSpecificData implementations. Also, we remove
isSubprocess
field from process JSON, and respectively from MetaData constructor. See corresponding db migrationV1_031__FragmentSpecificData.scala
- #2368
WithCategories
now takes categories as anOption[List[String]]
instead ofList[String]
. You should wrap given list of categories withSome(...)
.None
mean that component will be available in all categories. - #2360
union
,union-memo
anddead-end
components were extracted frommodel/genericModel.jar
tocomponents/baseComponents.jar
If you have your ownapplication.conf
which changesscenarioTypes
, you should add"components/baseComponents.jar"
entry intoclassPath
array - #2337 Extract base engine from standalone
- Common functionality of base engine (i.e. microservice based, without Flink) is extracted to
base-api
andbase-runtime
- new API for custom components (
pl.touk.nussknacker.engine.baseengine.api.customComponentTypes
) StandaloneProcessInterpreter
becomesStandaloneScenarioEngine
- Replace
Either[NonEmptyList[Error], _]
withValidatedNel[Error, _]
as return type StandaloneContext
becomesEngineRuntimeContext
- Common functionality of base engine (i.e. microservice based, without Flink) is extracted to
- #2349
queryable-state
module was removed,FlinkQueryableClient
was moved tonussknacker-flink-manager
.PrettyValidationErrors
,CustomActionRequest
andCustomActionResponse
moved fromnussknacker-ui
tonussknacker-restmodel
. - #2361 Removed
security
dependency fromlistener-api
.LoggedUser
replaced with dedicated class inlistener-api
. - #2385 Deprecated
CustomStreamTransformer.clearsContext
was removed. Use
@MethodToInvoke
def execute(...) =
ContextTransformation
.definedBy(ctx => Valid(ctx.clearVariables ...))
.implementedBy(...)
}
instead.
- #2348 #2459 #2486
#2490 #2496 #2536
Introduce
KafkaDeserializationSchema
andKafkaSerializationSchema
traits to decouple from flink dependency. moveKeyedValue
tonussknacker-util
, moveSchemaRegistryProvider
toutils/avro-util
To move between nussknacker's/flink's Kafka(De)serializationSchema usewrapToFlink(De)serializatioinSchema
fromFlinkSerializationSchemaConversions
.SchemaRegistryProvider
andConfluentSchemaRegistryProvider
is now innussknacker-avro-util
module.FlinkSourceFactory
is gone - useSourceFactory
instead.KafkaSourceFactory
,KafkaAvroSourceFactory
,KafkaSinkFactory
,KafkaAvroSinkFactory
, andContextIdGenerator
not depends on flink. ExtractedKafkaSourceImplFactory
,KafkaSinkImplFactory
andKafkaAvroSinkImplFactory
which deliver implementation of component (after all validations and parameters evaluation). Use respectively:FlinkKafkaSourceImplFactory
,FlinkKafkaSinkImplFactory
andFlinkKafkaAvroSinkImplFactory
to deliver flink implementations. Moved non-flink specific serializers, deserializers,BestEffortAvroEncoder
,ContextIdGenerator
s andRecordFormatter
s to kafka-util/avro-utilKafkaDelayedSourceFactory
is nowDelayedKafkaSourceFactory
.FixedRecordFormatterFactoryWrapper
moved toRecordFormatterFactory
- #2477
FlinkContextInitializer
andFlinkGenericContextInitializer
merged toContextInitializer
,BasicFlinkContextInitializer
andBasicFlinkGenericContextInitializer
merged toBasicContextInitializer
. All of them moved topl.touk.nussknacker.engine.api.process
package.ContextInitializer.validationContext
returnsValidatedNel
- before this change errors during context initialization weren't accumulated.ContextInitializingFunction
now is a scala's function instead of Flink's MapFunction. You should wrap it withRichLifecycleMapFunction
to make sure that it will be opened correctly by Flink.InputMeta
was moved tokafka-util
module. - #2389 #2391
deployment-manager-api
module was extracted andDeploymentManagerProvider
,ProcessingTypeData
andQueryableClient
was moved frominterpreter
into it.DeploymentManager
,CustomAction
andProcessState
was moved fromapi
todeployment-manager-api
.ProcessingType
was moved torest-model
package. - #2393 Added
ActorSystem
,ExecutionContext
andSttpBackend
intoDeploymentManagerProvider.createDeploymentManager
. During clean ups also was removednussknacker-http-utils
dependency toasync-http-client-backend-future
and addedSttpBackend
toCountsReporterCreator.createReporter
arguments. - #2397 Common
EngineRuntimeContext
lifecycle andMetricsProvider
. This may cause runtime consequences - make sure your custom services/listeners invokeopen
/close
correctly - especially in complex inheritance scenarios.Lifecycle
has nowEngineRuntimeContext
as parameter,JobData
is embedded in it.TimeMeasuringService
replacesGenericTimeMeasuringService
, Flink/Standalone flavours ofTimeMeasuringService
are removedEngineRuntimeContext
andMetricsProvider
moved to base API,RuntimeContextLifecycle
moved to base API asLifecycle
GenericInstantRateMeter
is nowInstantRateMeter
- Flink
RuntimeContextLifecycle
should be replaced in most cases byLifecycle
- In Flink engine
MetricsProvider
(obtained withEngineRuntimeContext
) should be used in most places instead ofMetricUtils
- #2486
Context.withInitialId
is deprecated now - useEngineRuntimeContext.contextIdGenerator
instead.EngineRuntimeContext
can be accessible viaFlinkCustomNodeContext.convertToEngineRuntimeContext
- #2377 #2534 Removed
clazz
fromSourceFactory
. Remove generic parameter fromSource
andSourceFactory
. Return type of source should be returned either by:returnType
field of@MethodToInvoke
ContextTransformation
APIGenericNodeTransformer
APISourceFactory.noParam
- #2453 Custom actions for
PeriodicDeploymentManager
now can be defined and implemented outside this class, inPeriodicCustomActionsProvider
created byPeriodicCustomActionsProviderFactory
. If you do not need them, just passPeriodicCustomActionsProviderFactory.noOp
to object'sPeriodicDeploymentManager
factory method. - #2501
nussknacker-baseengine-components
module renamed tonussknacker-lite-base-components
- #2221 ReflectUtils
fixedClassSimpleNameWithoutParentModule
renamed tosimpleNameWithoutSuffix
- #2495 TypeSpecificDataInitializer trait change to TypeSpecificDataInitializ
- 2245
FailedEvent
has been specified inFailedOnDeployEvent
andFailedOnRunEvent
In version 1.0.0
- #1439 #2090 Upgrade do Flink 1.13.
setTimeCharacteristic
is deprecated, and should be handled automatically by Flink.UserClassLoader
was removed, use appropriate Flink objects or context ClassLoader.- RocksDB configuration is turned on by
rocksdb.enable
instead ofrocksdb.checkpointDataUri
which is not used now.
- #2133 SQL Variable is hidden in generic model, please look at comment in
defaultModelConfig.conf
- #2152
schedulePropertyExtractor
parameter ofPeriodicDeploymentManagerProvider
was changed to a factory, replace with a lambda creating the original property extractor. - #2159
useTypingResultTypeInformation
option is now enabled by default - #2108 Changes in
ClassExtractionSettings
:- Refactor of classes defining extraction rules,
TypedClass
has privateapply
method, please useTyped.typedClass
- Fewer classes/methods are accessible in SpEL, in particular Scala collections, internal time API, methods returning or having parameters from excluded classes
- Changes in
OAuth2
security components:- refactoring of
OpenIdConnectService
, now it's namedGenericOidcService
(it's best to useOidcService
, which can handle most of the configuration automatically)
- refactoring of
- New security settings, in particular new flags in
ExpressionConfig
:strictMethodsChecking
staticMethodInvocationsChecking
methodExecutionForUnknownAllowed
dynamicPropertyAccessAllowed
spelExpressionExcludeList
- #2101 Global permissions can be arbitrary string, for admin user it's not necessary to return global permissions
- #2182 To avoid classloader leaks during SQL
DriverManager
registration, HSQLDB (used e.g. for SQL Variable) is no longer included in model jars, it should be added in Flinklib
dir
In version 0.4.0
-
#1479
ProcessId
andVersionId
moved to API included inProcessVersion
, remove spuriousProcessId
andProcessVersionId
in restmodel. -
#1422 Removed
ServiceReturningType
andWithExplicitMethod
, useEagerServiceWithStaticParameters
,EnricherContextTransformation
orSingleInputGenericNodeTransformation
-
#1845
AuthenticatorData
has been renamed toAuthenticationResources
and changed into a trait,apply
construction has been preserved.AuthenticatorFactory
and itscreateAuthenticator
method has been renamed toAuthenticationProvider
andcreateAuthenticationResources
. It is recommended to rename the main class of any custom authentication module to<Something>AuthenticationProvider
accordingly. -
#1542
KafkaConfig
now has new parametertopicsExistenceValidationConfig
. WhentopicsExistenceValidationConfig.enabled = true
Kafka sources/sinks do not validate if provided topic does not exist and cluster is configured withauto.create.topics.enable=false
-
#1416
OAuth2Service
has changed. You can still use your old implementation by importingOAuth2OldService
with an alias.OAuth2ServiceFactory.create
method now accepts implicit parameters for anExecutionContext
andsttp.HttpBackend
. You can ignore them to maintain previous behaviour, but it is always better to use them instead of locally defined ones. -
#1346
AggregatorFunction
now takes type of stored state that can beimmutable.SortedMap
(previous behaviour) orjava.util.Map
(using Flink's serialization) andvalidatedStoredType
parameter for providing betterTypeInformation
for aggregated values -
#1343
FirstAggregator
changed serialized state, it is not compatible,Aggregator
trait has new methodcomputeStoredType
-
#1352 and #1568 AvroStringSettings class has been introduced, which allows control whether Avro type
string
is represented byjava.lang.String
(also in runtime) orjava.lang.CharSequence
(implemented in runtime byorg.apache.avro.util.Utf8
). This setting is available through environment variableAVRO_USE_STRING_FOR_STRING_TYPE
- default istrue
. Please mind that this setting is global - it applies to all processes running on Flink and also requires restarting TaskManager when changing the value. -
#1361 Lazy variables are removed, you should use standard enrichers for those cases. Their handling has been source of many problems and they made it harder to reason about the exeuction of process.
-
#1373 Creating
ClassLoaderModelData
directly is not allowed, useModelData.apply
with plain config, wrapping with ModelConfigToLoad by yourself is not needed. -
#1406
ServiceReturningType
is deprecated in favour ofEagerService
-
#1445
RecordFormatter
now handlesTestDataSplit
for Kafka sources. It is required inKafkaSource
creation, instead ofTestDataSplit
-
#1433 Pass DeploymentData to process via JobData, additional parameters to deployment methods are needed. Separate
ExternalDeploymentId
fromDeploymentId
(generated by NK) -
#1466
ProcessManager.deploy
can returnExternalDeploymentId
-
- Slight change of API of
StringKeyedValueMapper
- Change of semantics of some parameters of
AggregatorFunction
,AggregatorFunctionMixin
(storedAggregateType becomes aggregateElementType)
- Slight change of API of
-
#1405 'KafkaAvroSink' requires more generic 'AvroSinkValue' as value parameter
-
- Change of
FlinkSource
API: sourceStream produces stream of initializedContext
(DataStream[Context]
) This initialization step was previously performed withinFlinkProcessRegistrar.registerSourcePart
. Now it happens explicitly within the flink source. FlinkIntermediateRawSource
is used as an extension to flink sources, it prepares source with typical stream transformations (add source function, set uid, assign timestamp, initializeContext
)FlinkContextInitializer
is used to initializeContext
. It provides map function that transforms raw event (produced by flink source function) intoContext
variable. Default implementation ofFlinkContextInitializer
, seeBasicFlinkContextInitializer
, sets raw event value to singe "input" variable.- For sources based on
GenericNodeTransformation
it allows to initializeContext
with more than one variable. Default implementation of initializer, seeBasicFlinkGenericContextInitializer
, provides default definition of variables as aValidationContext
with single "input" variable. The implementation requires to provide separately the definition of "input" variable type (TypingResult
). SeeGenericSourceWithCustomVariablesSample
. - To enable "test source" functionality, a source needs to be extended with
SourceTestSupport
. - For flink sources that use
TestDataParserProvider
switch toFlinkSourceTestSupport
(which is used to provide "test source" functionality for flink sources). - Old
TestDataParserProvider
is renamed toSourceTestSupport
- To enable test data generator for "test source" , a source needs to be extended with both
SourceTestSupport
andTestDataGenerator
. What was related to "test source" functionality and was obsolete inFlinkSource
now is excluded toFlinkSourceTestSupport
. FlinkCustomNodeContext
has access toTypeInformationDetection
, it allows to get TypeInformation for the node stream mapping from ValidationContext.- For kafka sources
RecordFormatter
parses raw test data toConsumerRecord
which fits into deserializer (instead ofProducerRecord
that required another transformation). - Definitions of names of common
Context
variables are moved toVariableConstants
(instead ofInterpreter
).
- Change of
-
#1497 Changes in
PeriodicProcessManager
, changePeriodicProperty
toScheduleProperty
-
- trait
KafkaAvroDeserializationSchemaFactory
uses both key and value ClassTags and schemas (instead of value-only), check the order of parameters. - ClassTag is provided in params in avro key-value deserialization schema factory:
KafkaAvroKeyValueDeserializationSchemaFactory
BaseKafkaAvroSourceFactory
is able to read both key and value schema determiner to build proper DeserializationSchema (support for keys is not fully introduced in this change)
- trait
-
#1514
ExecutionConfigPreparer
has different method parameter -JobData
, which has more info than previous parameters -
#1532
TypedObjectTypingResult#fields
uses nowscala.collection.immutable.ListMap
to keep fields order -
#1546
StandaloneCustomTransformer
now takes a list ofContext
objects, to process them in one go -
#1557 Some classes from standalone engine were moved to standalone api to remove engine to (model) utils dependency:
StandaloneContext
,StandaloneContextLifecycle
,MetricsProvider
-
#1558
FlinkProcessRegistrar
takes configuration directly fromFlinkProcessCompiler
(this can affect some tests setup) -
#1631 Introduction of
nussknacker.config.locations
property, drop use of standardconfig.file
property. Model configuration no longer has direct access to root UI config. -
- Replaced
KafkaSourceFactory
with source based onGenericNodeTransformation
, which gives access to setup ofValidationContext
andContext
initialization. To migrateKafkaSourceFactory
:- provide deserializer factory (source factory requires deserialization to
ConsumerRecord
):- use
ConsumerRecordDeserializationSchemaFactory
with currentDeserializationSchema
as a value deserializer, add key deserializer (e.g. org.apache.kafka.common.serialization.StringDeserializer) - or use
FixedValueDeserializationSchemaFactory
with simple key-as-string deserializer
- use
- provide RecordFormatterFactory
- use
ConsumerRecordToJsonFormatterFactory
for whole key-value-and-metadata serialization - or, for value-only-and-without-metadata scenario, you can use current
RecordFormater
wrapped inFixedRecordFormatterFactoryWrapper
- use
- provide timestampAssigner that is able to extract time from
ConsumerRecord[K, V]
- provide deserializer factory (source factory requires deserialization to
- Removed
BaseKafkaSourceFactory
with multiple topics support: useKafkaSourceFactory
instead, see "source with two input topics" test case - Removed
SingleTopicKafkaSourceFactory
: useKafkaSourceFactory
with customprepareInitialParameters
,contextTransformation
andextractTopics
to alter parameter list and provide constant topic value. TypingResultAwareTypeInformationCustomisation
is moved to package pl.touk.nussknacker.engine.flink.api.typeinformation
Example of source with value-only deserialization and custom timestampAssigner:
// provide new deserializer factory with old schema definition for event's value
val oldSchema = new EspDeserializationSchema[SampleValue](bytes => io.circe.parser.decode[SampleValue](new String(bytes)).toOption.get)
val schemaFactory: KafkaDeserializationSchemaFactory[ConsumerRecord[String, SampleValue]] = new FixedValueDeserializationSchemaFactory(oldSchema)
// ... provide timestampAssigner that extracts timestamp from SampleValue.customTimestampField
// ... or use event's metadata: record.timestamp()
def timestampExtractor(record: ConsumerRecord[String, SampleValue]): Long = record.value().customTimestampField
val watermarkHandler = StandardTimestampWatermarkHandler.boundedOutOfOrderness[ConsumerRecord[String, SampleValue]](timestampExtractor, java.time.Duration.ofMinutes(10L))
val timestampAssigner: Option[TimestampWatermarkHandler[ConsumerRecord[String, SampleValue]]] = Some(watermarkHandler)
// ... provide RecordFormatterFactory that allows to generate and parse test data with key, headers and other metadata
val formatterFactory: RecordFormatterFactory = new ConsumerRecordToJsonFormatterFactory[String, SampleValue]
// ... and finally
val sourceFactory = new KafkaSourceFactory[String, SampleValue](schemaFactory, timestampAssigner, formatterFactory, dummyProcessObjectDependencies) - Replaced
-
#1651
KafkaAvroSourceFactory
provides additional #inputMeta variable with event's metadata.- That source now has key and value type parameters. That parameters are relevant for sources handling
SpecificRecord
s. ForGenericRecords
use explicitlyKafkaAvroSourceFactory[Any, Any]
. SpecificRecordKafkaAvroSourceFactory
extends wholeKafkaAvroSourceFactory
with context validation and initialization- New flag in
KafkaConfig
:useStringForKey
determines if event's key should be intepreted as ordinary String (which is default scenario). It is used in deserialization and for generating/parsing test data. SchemaRegistryProvider
now provides factories to produce SchemaRegistryClient and RecordFormatter.- For
ConfluentSchemaRegistryProvider
KafkaConfig and ProcessObjectDependencies (that contains KafkaConfig data) are no longer required. That configuration is required by factories in the moment the creation of requested objects that happens inKafkaAvroSourceFactory
(and that makes that all objects withinKafkaAvroSourceFactory
see the same kafka configuration). - Removed:
BaseKafkaAvroSourceFactory
, the class is incorporated intoKafkaAvroSourceFactory
to provide elastic approach to create KafkaSourcewith ReturningType
for generic types (this is defined by ValidationContext, see alsoKafkaContextInitializer
that allows to return more than one variable)KafkaAvroValueDeserializationSchemaFactory
(source requires deserialization toConsumerRecord[K, V]
, there are only deserializers based onKafkaAvroKeyValueDeserializationSchemaFactory
)ConfluentKafkaAvroDeserializationSchemaFactory
, useConfluentKeyValueKafkaAvroDeserializationFactory
TupleAvroKeyValueKafkaAvroDeserializerSchemaFactory
, this approach is deprecated due to #inputMeta variable that contains key data
To migrate
KafkaAvroSourceFactory
:- Provide
KafkaConfig
with correctuseStringForKey
flag value. By default we want to EvictableStatehandle keys as ordinary String and all topics related to such config require only value schema definitions (key schemas are ignored). For specific scenario, when complex key with its own schema is provided, this flag is false and all topics related to this config require both key and value schema definitions. Example of default KafkaConfig override:override protected def prepareKafkaConfig: KafkaConfig = super.prepareKafkaConfig.copy(useStringForKey = false)
- provide your own
SchemaRegistryProvider
(or useConfluentSchemaRegistryProvider
) - custom RecordFormatter can be wrapped in
FixedRecordFormatterFactoryWrapper
(or keepConfluentAvroToJsonFormatterFactory
) - provide timestampAssigner that is able to extract time from
ConsumerRecord[K, V]
(see example above)
- That source now has key and value type parameters. That parameters are relevant for sources handling
-
#1741 Minor changes in
KafkaUtils
,NonTransientException
usesInstant
instead ofLocalDateTime
-
#1806 Remove old, deprecated API:
EvictableState
,RichEvictableState
- useEvictableStateFunction
checkpointInterval
- usecheckpointConfig.checkpointInterval
- old versions of
sampleTransformers
- use newer ones MiniClusterExecutionEnvironment.runningJobs()
- useflinkMiniClusterHolder.runningJobs()
-
#1807 Removed
jdbcServer
, please use Postgres for production-ready setups -
- RecordFormatterFactory instead of one, uses two type parameters: K, V
- ConfluentAvroToJsonFormatter is produced by ConfluentAvroToJsonFormatterFactory
- ConfluentAvroToJsonFormatter produces test data in valid JSON format, does not use Separator
- ConfluentAvroMessageFormatter has asJson method instead of writeTo
- ConfluentAvroMessageReader has readJson method instead of readMessage Example test data object:
{"keySchemaId":null,"valueSchemaId":1,"consumerRecord":{"key":null,"value":{"first":"Jan","last":"Kowalski"},"topic":"testAvroRecordTopic1","partition":0,"offset":0,"timestamp":1624279687756,"timestampType":"CreateTime","headers":{},"leaderEpoch":0}}
-
#1663 Default
FlinkExceptionHandler
implementations are deprecated, useConfigurableExceptionHandler
instead. -
#1731 RockDB config's flag
incrementalCheckpoints
is turned on by default. -
#1825 Default dashboard renamed from
flink-esp
tonussknacker-scenario
-
#1836 Change default
kafka.consumerGroupNamingStrategy
toprocessId-nodeId
. -
#1357 Run mode added to nodes.
ServiceInvoker
interface was extended with new, implicitrunMode
parameter. -
#1836 Change default
kafka.consumerGroupNamingStrategy
toprocessId-nodeId
. -
#1886 aggregate-sliding with emitWhenEventLeft = true, aggregate-tumbling and aggregate-session components now doesn't emit full context of variables that were before node (because of performance reasons and because that wasn't obvious which one context is emitted). If you want to emit some information other than aggregated value and key (availabled via new
#key
variable), you should use#AGG.map
expression inaggregateBy
. -
#1910
processTypes
renamed toscenarioTypes
. You can still use oldprocessTypes
configuration. Old configuration will be removed in version0.5.0
. -
Various naming changes:
In version 0.3.0
-
#1313 Kafka Avro API passes
KafkaConfig
duringTypeInformation
determining -
#1305 Kafka Avro API passes
RuntimeSchemaData
instead ofSchema
in various places -
#1304
SerializerWithSpecifiedClass
was moved toflink-api
module. -
#1044 Upgrade to Flink 1.11. Current watermark/timestamp mechanisms are deprectated in Flink 1.11, new API
TimestampWatermarkHandler
is introduced, withLegacyTimestampWatermarkHandler
as wrapper for previous mechanisms. -
#1244
Parameter
has new parameter 'variablesToHide' withSet
of variable names that will be hidden before parameter's evaluation -
#1159 #1170 Changes in
GenericNodeTransformation
API:- Now
implementation
takes additional parameter with final state value determined duringcontextTransformation
DefinedLazyParameter
andDefinedEagerParameter
holdsexpression: TypedExpression
instead ofreturnType: TypingResult
DefinedLazyBranchParameter
andDefinedEagerBranchParameter
holdsexpressionByBranchId: Map[String, TypedExpression]
instead ofreturnTypeByBranchId: Map[String, TypingResult]
- Now
-
- Now
SimpleSlidingAggregateTransformerV2
andSlidingAggregateTransformer
is deprecated in favour ofSlidingAggregateTransformerV2
- Now
SimpleTumblingAggregateTransformer
is deprecated in favour ofTumblingAggregateTransformer
- Now
SumAggregator
,MaxAggregator
andMinAggregator
doesn't change type of aggregated value (previously was changed to Double) - Now
SumAggregator
,MaxAggregator
andMinAggregator
return null instead of0D
/Double.MaxValue
/Double.MinValue
for case when there was no element added beforegetResult
- Now
-
#1149 FlinkProcessRegistrar refactor (can affect test code)
-
#1166
model.conf
should be renamed todefaultModelConfig.conf
-
#1218 FlinkProcessManager is no longer bundled in ui uber-jar. In docker/tgz distribution
-
#1255 Moved displaying
Metrics tab
tocustomTabs
-
#1257 Improvements: Flink test util package
- Added methods:
cancelJob
,submitJob
,listJobs
,runningJobs
toFlinkMiniClusterHolder
- Deprecated:
runningJobs
, fromMiniClusterExecutionEnvironment
- Removed:
getClusterClient
fromFlinkMiniClusterHolder
interface, because of flink compatibility at Flink 1.9 - Renamed:
FlinkStreamingProcessRegistrar
toFlinkProcessManager
- Added methods:
-
#1303 TypedObjectTypingResult has a new field: additionalInfo
In version 0.2.0
-
#1104 Creation of
FlinkMiniCluster
is now extracted fromStoppableExecutionEnvironment
. You should create it using e.g.:val flinkMiniClusterHolder = FlinkMiniClusterHolder(FlinkTestConfiguration.configuration(parallelism))
flinkMiniClusterHolder.start()and then create environment using:
flinkMiniClusterHolder.createExecutionEnvironment()
. At the end you should cleanup
flinkMiniClusterHolder
by:flinkMiniClusterHolder.stop()
.
FlinkMiniClusterHolder
should be created once for test class - it is thread safe and resource expensive.MiniClusterExecutionEnvironment
in the other hand should be created for each process. It is not thread safe because underlyingStreamExecutionEnvironment
is not. You can useFlinkSpec
to achieve that. -
pl.touk.nussknacker.engine.queryablestate.QueryableClient
was moved fromqueryableState
module topl.touk.nussknacker.engine.api.queryablestate
package inapi
modulepl.touk.nussknacker.engine.queryablestate.QueryableState
was moved topl.touk.nussknacker.engine.api.queryablestate
- CustomTransformers from
pl.touk.nussknacker.engine.flink.util.transformer
inflinkUtil
module were moved to newflinkModelUtil
module. pl.touk.nussknacker.engine.testing.EmptyProcessConfigCreator
was moved frominterpreter
module topl.touk.nussknacker.engine.util.process
package inutil
module
-
#1039 Generic parameter of
LazyParameter[T]
has upper bound AnyRef now to avoid problems with bad type extraction. It caused changesAny
toAnyRef
in a few places - mainlyFlinkLazyParameterFunctionHelper
andFlinkCustomStreamTransformation
-
#1039
FlinkStreamingProcessRegistrar.apply
has a new parameter of typeExecutionConfigPreparer
. In production code you should passExecutionConfigPreparer.defaultChain()
there and in test code you should passExecutionConfigPreparer.unOptimizedChain()
. See scaladocs for more info. If you already have done some Flink'sExecutionConfig
set up before you've registered process, you should consider create your own chain usingExecutionConfigPreparer.chain()
. -
#1039
FlinkSourceFactory
doesn't takeTypeInformation
type class as a generic parameter now. Instead of this, it takesClassTag
.TypeInformation
is determined during source creation.typeInformation[T]
method was moved fromBasicFlinkSource
toFlinkSource
because still must be some place to determine it for tests purpose. -
#965 'aggregate' node in generic model was renamed to 'aggregate-sliding'
-
#922 HealthCheck API has new structure, naming and JSON responses:
- old
/healthCheck
is moved to/healthCheck/process/deployment
- old
/sanityCheck
is moved to/healthCheck/process/validation
- top level
/healthCheck
indicates general "app-is-running" state
- old
-
#879 Metrics use variables by default, see docs to enable old mode, suitable for graphite protocol. To use old way of sending:
- put
globalParameters.useLegacyMetrics = true
in each model configuration (to configure metrics sending in Flink) - put:
countsSettings {
user: ...
password: ...
influxUrl: ...
metricsConfig {
nodeCountMetric: "nodeCount.count"
sourceCountMetric: "source.count"
nodeIdTag: "action"
countField: "value"
}
} - put
-
Introduction to KafkaAvro API: #871, #881, #903, #981, #989, #998, #1007, #1014, #1034, #1041
API for KafkaAvroSourceFactory
was changed:
KafkaAvroSourceFactory
old way:
val clientFactory = new SchemaRegistryClientFactory
val source = new KafkaAvroSourceFactory(
new AvroDeserializationSchemaFactory[GenericData.Record](clientFactory, useSpecificAvroReader = false),
clientFactory,
None,
processObjectDependencies = processObjectDependencies
)
KafkaAvroSourceFactory
new way :
val schemaRegistryProvider = ConfluentSchemaRegistryProvider(processObjectDependencies)
val source = new KafkaAvroSourceFactory(schemaRegistryProvider, processObjectDependencies, None)
Provided new API for Kafka Avro Sink:
val kafkaAvroSinkFactory = new KafkaAvroSinkFactory(schemaRegistryProvider, processObjectDependencies)
Additional changes:
- Bump up confluent package to 5.5.0
- (Refactor Kafka API) Moved
KafkaSourceFactory
topl.touk.nussknacker.engine.kafka.sink
package - (Refactor Kafka API) Changed
BaseKafkaSourceFactory
, now it requiresdeserializationSchemaFactory: KafkaDeserializationSchemaFactory[T]
- (Refactor Kafka API) Moved
KafkaSinkFactory
topl.touk.nussknacker.engine.kafka.source
package - (Refactor Kafka API) Renamed
SerializationSchemaFactory
toKafkaSerializationSchemaFactory
- (Refactor Kafka API) Renamed
DeserializationSchemaFactory
toKafkaDeserializationSchemaFactory
- (Refactor Kafka API) Renamed
FixedDeserializationSchemaFactory
toFixedKafkaDeserializationSchemaFactory
- (Refactor Kafka API) Renamed
FixedSerializationSchemaFactory
toFixedKafkaSerializationSchemaFactory
- (Refactor Kafka API) Removed
KafkaSerializationSchemaFactoryBase
- (Refactor Kafka API) Replaced
KafkaKeyValueSerializationSchemaFactoryBase
byKafkaAvroKeyValueSerializationSchemaFactory
(it handles only avro case now) - (Refactor Kafka API) Removed
KafkaDeserializationSchemaFactoryBase
- (Refactor Kafka API) Replaced
KafkaKeyValueDeserializationSchemaFactoryBase
byKafkaAvroKeyValueDeserializationSchemaFactory
(it handles only avro case now) - (Refactor KafkaAvro API) Renamed
AvroDeserializationSchemaFactory
toConfluentKafkaAvroDeserializationSchemaFactory
and moved toavro.schemaregistry.confluent
package - (Refactor KafkaAvro API) Renamed
AvroKeyValueDeserializationSchemaFactory
toConfluentKafkaAvroDeserializationSchemaFactory
and moved toavro.schemaregistry.confluent
package - (Refactor KafkaAvro API) Renamed
AvroSerializationSchemaFactory
toConfluentAvroSerializationSchemaFactory
and moved toavro.schemaregistry.confluent
package - (Refactor KafkaAvro API) Renamed
AvroKeyValueSerializationSchemaFactory
toConfluentAvroKeyValueSerializationSchemaFactory
and moved toavro.schemaregistry.confluent
package - (Refactor KafkaAvro API) Removed
FixedKafkaAvroSourceFactory
andFixedKafkaAvroSinkFactory
(now we don't support fixed schema) - (Refactor Kafka API) Replaced
topics: List[String]
byList[PreparedKafkaTopic]
and removedprocessObjectDependencies
inKafkaSource
Be aware that we are using Avro 1.9.2 instead of default Flink's 1.8.2 (for Java time logical types conversions purpose).
- #1013 Expression evaluation is synchronous now. It shouldn't cause any problems (all languages were synchronous anyway), but some internal code may have to change.
In version 0.1.2
- #957 Custom node
aggregate
fromgeneric
model has changed parameter fromwindowLengthInSeconds
towindowLength
with human friendly duration input. If you have used it in process, you need to insert correct value again. - #954
TypedMap
is not a case class wrapping scala Map anymore. If you have done some pattern matching on it, you should usecase typedMap: TypedMap => typedMap.asScala
instead.
In version 0.1.1
- #930
DeeplyCheckingExceptionExtractor
was moved fromnussknacker-flink-util
module tonussknacker-util
module. - #919
KafkaSource
constructor now doesn't takeconsumerGroup
. Instead of this it computesconsumerGroup
on their own based onkafka.consumerGroupNamingStrategy
inmodelConfig
(default set toprocessId
). You can also override it byoverriddenConsumerGroup
optional parameter. Regards to this changes,KafkaConfig
has new, optional parameterconsumerGroupNamingStrategy
. - #920
KafkaSource
constructor now takesKafkaConfig
instead of using one that was parsed byBaseKafkaSourceFactory.kafkaConfig
. Also if you parse Typesafe Config toKafkaSource
on your own, now you should use dedicated methodKafkaConfig.parseConfig
to avoid further problems when parsing strategy will be changed. - #914
pl.touk.nussknacker.engine.api.definition.Parameter
has deprecated main factory method withruntimeClass
parameter. Now should be passedisLazyParameter
instead. Also were removedruntimeClass
from variances of factory methods prepared for easy testing (optional
method and so on).
In version 0.1.0
- #755 Default async execution context does not depend on parallelism.
asyncExecutionConfig.parallelismMultiplier
has been deprecated and should be replaced withasyncExecutionConfig.workers
. 8 should be sane default value. - #722 Old way of configuring Flink and model (via
flinkConfig
andprocessConfig
) is removed.processTypes
configuration should be used from now on. Example:becomes:flinkConfig {...}
processConfig {...}processTypes {
"type e.g. streaming" {
deploymentConfig {
type: "flinkStreaming"
PUT HERE PROPERTIES OF flinkConfig FROM OLD CONFIG
}
modelConfig {
classPath: PUT HERE VALUE OF flinkConfig.classPath FROM OLD CONFIG
PUT HERE PROPERTIES OF processConfig FROM OLD CONFIG
}
}
} - #763 Some API traits (ProcessManager, DictRegistry DictQueryService, CountsReporter) now extend
java.lang.AutoCloseable
. - Old way of additional properties configuration should be replaced by the new one, which is now mapped to
Map[String, AdditionalPropertyConfig]
. Example in your config:becomes:additionalFieldsConfig: {
mySelectProperty {
label: "Description"
type: "select"
isRequired: false
values: ["true", "false"]
}
}additionalPropertiesConfig {
mySelectProperty {
label: "Description"
defaultValue: "false"
editor: {
type: "FixedValuesParameterEditor",
possibleValues: [
{"label": "Yes", "expression": "true"},
{"label": "No", "expression": "false"}
]
}
}
} - #588 #882
FlinkSource
API changed, current implementation is nowBasicFlinkSource
- #839 #882
FlinkSink
API changed, current implementation is nowBasicFlinkSink
- #841
ProcessConfigCreator
API changed; note that currently all process objects are invoked withProcessObjectDependencies
as a parameter. The APIs ofKafkaSinkFactory
,KafkaSourceFactory
, and all their implementations were changed.Config
is available as property ofProcessObjectDependencies
instance. - #863
restUrl
indeploymentConfig
need to be preceded with protocol. Host with port only is not allowed anymore. - Rename
grafanaSettings
tometricsSettings
in configuration.
In version 0.0.12
- Upgrade to Flink 1.7
- Refactor of custom transformations, dictionaries, unions, please look at samples in example or generic to see API changes
- Considerable changes to authorization configuration, please look at sample config to see changes
- Circe is now used by default instead of Argonaut, but still can use Argonaut in Displayable
In version 0.0.11
- Changes in CustomStreamTransformer implementation, LazyInterpreter became LazyParameter, please look at samples to see changes in API
In version 0.0.8
- Upgrade to Flink 1.4
- Change of format of Flink cluster configuration
- Parameters of sources and sinks are expressions now - automatic update of DB is available
- Change of configuration of Grafana dashboards
- Custom processes are defined in main configuration file now