Skip to main content
Version: 1.6

Flink specific model configuration

Parameter nameImportanceTypeDefault valueDescription
timeoutMediumduration10sTimeout for invocation of scenario part (including enrichers)
asyncExecutionConfig.bufferSizeLowint200Buffer size used for async I/O
asyncExecutionConfig.workersLowint8Number of workers for thread pool used with async I/O
asyncExecutionConfig.defaultUseAsyncInterpretationMediumbooleantrueShould async I/O be used by scenarios by default - if you don't use many enrichers etc. you may consider setting this flag to false
checkpointConfig.checkpointIntervalMediumduration10mHow often should checkpoints be performed by default
checkpointConfig.minPauseBetweenCheckpointsLowdurationcheckpointInterval / 2Minimal pause between checkpoints
checkpointConfig.maxConcurrentCheckpointsLowint1Maximum concurrent checkpoints setting
checkpointConfig.tolerableCheckpointFailureNumberLowintTolerable failed checkpoint setting
rocksDB.enableMediumbooleantrueEnable RocksDB state backend support
rocksDB.incrementalCheckpointsMediumbooleantrueShould incremental checkpoints be used
rocksDB.dbStoragePathLowstringAllows to override RocksDB local data storage
enableObjectReuseLowbooleantrueShould allow object reuse
globalParameters.explicitUidInStatefulOperatorsLowbooleantrueShould consistent operator uuids be used
globalParameters.useTypingResultTypeInformationLowbooleantrueEnables using Nussknacker additional typing information for state serialization. It makes serialization much faster, currently consider it as experimental
globalParameters.forceSyncInterpretationForSyncScenarioPartLowbooleantrueForces synchronous interpretation for scenario parts that does not contain any services (enrichers, processors). Applies for scenarios with async enabled

TODO: KAFKA

Configuring exception handling

Exception handling can be customized using provided EspExceptionConsumer. By default, there are two available:

  • BrieflyLogging
  • VerboselyLogging More of them can be added with custom extensions. By default, basic error metrics are collected. If for some reason it's not desirable, metrics collector can be turned off with withRateMeter: false setting. When an exception is raised within a scenario, the handler uses WithExceptionExtractor to determine if it should be consumed (via EspExceptionConsumer) or rethrown. A custom extractor can be provided and indicated with optional exceptionExtractor setting. When no exceptionExtractor is set, handler uses DefaultWithExceptionExtractor (same as exceptionExtractor: Default).

Some handlers can have additional properties, e.g. built in logging handlers can add custom parameters to log. See example below.

exceptionHandler {
type: BrieflyLogging
withRateMeter: false
exceptionExtractor: SomeCustomExtractor
params: {
additional: "value1"
}
}

Out of the box, Nussknacker provides following ExceptionHandler types:

  • BrieflyLogging - log error to Flink logs (on info level, with stacktrace on debug level)
  • VerboselyLogging - log error to Flink logs on error level, together with all variables (should be used mainly for debugging)
  • Kafka - send errors to Kafka topic, see common config for the details.

Configuring restart strategies

We rely on Flink restart strategies described in documentation. It's also possible to configure restart strategies per scenario, using additional properties.

    restartStrategy {
//if scenarioProperty is configured, strategy name will be used from this category: restartType = for-important, etc.
//probably scenarioProperty should be configured with FixedValuesEditor
//scenarioProperty: restartType. For simple cases one needs to configure only default strategy
default: {
strategy: fixed-delay
attempts: 10
delay: 10s
}
for-important {
strategy: fixed-delay
attempts: 30
}
fail-fast {
strategy: none
}
}