Skip to main content
Version: 0.4

Monitoring

One of crucial aspects of running production streaming processes is monitoring. In this section we'll explain how Nussknacker process running of Flink cluster gives rise to certain metrics, and how to process them and display in Grafana.

For each process following metrics are collected:

  • number of events consumed
  • number of events that passed the whole process
  • number of events filtered out
  • http services' invocation times, errors and throughput
  • event processing delays

Metrics technical details

We recommend using InfluxDB or Prometheus for storing metrics. In default (e.g. demo) setup we use InfluxDB.

Metric types

We use following standard metric types, which are reported according to configured metric reporter

  • gauge
  • histogram
  • counter
  • meter

In descriptions below we also use composite metrics, which translate to more than one Flink/Dropwizard metrics:

  • instantRate - gauge measuring instant rate (that is, without smoothing) TODO: add also 'normal' meter for this metric

  • instantRateWithCount - as above plus counter. TODO: after adding meter to instantRate this will become obsolete

    • instantRate
    • count - counter
  • espTimer - this type of metrics is used to track times of invocations with rate (e.g. how long service invocation took)

    • histogram
    • instantRate - gauge

Common metrics

MeasurementAdditional tagsMetric typeNotes
nodeCountnodeIdcounterused e.g. by count functionality
error.instantRate-instantRate
error.instantRateByNodenodeIdinstantRatenodeId is unknown if we fail to detect exact place
service.OKserviceNameespTimersee below
service.FAILserviceNameespTimersee below

service metric is not added automatically. It can be used via GenericTimeMeasuringService to measure arbitrary code returning Future - it will be classified as OK or FAIL if it's successful or not.

MeasurementAdditional tagsMetric typeDescription
sourcenodeIdinstantRateWithCount
eventtimedelay.histogramnodeIdhistorgramonly for sources with eventTime, measures delay from event time to system time
eventtimedelay.minimalDelaynodeIdgaugetime from last event (eventTime) to system time
endnodeIdinstantRateWithCountfor sinks and end processors
dead_endnodeIdinstantRateWithCountfor event filtered out on filters, switches etc.

Metrics in standalone mode

In standalone mode we use Dropwizard metrics. However, due to low traffic in this project we consider using Micrometer in the future.

MeasurementAdditional tagsMetric typeDescription
invocation.success-espTimer
invocation.failurenodeIdespTimer

Reporting metrics to InfluxDB and Grafana

We provide sample configuration and dashboard for InfluxDB & Grafana. Please check Docker demo. The dashboard can be found here.

While Flink provides InfluxDB reporter, it's lacking a few of capabilities we needed. That's why in Docker demo setup we send metrics through Telegraf, to:

  • Add new tag env for distinguishing environments
  • Remove some internal Flink tags, which can cause uncontrolled growth of number of series in InfluxDB
  • Change some tags to more meaningful names (e.g. job_name to process)
  • Remove tag names from measurements Please see provided telegraf.conf for the details.

Legacy mode

In the past we used graphite protocol to send metrics to InfluxDB. It was difficult to use and extended. The old way of sending metrics can be enabled, check migration guide.

Counts

When process is running it's often useful to be able to analyze how many events passed through each node in the given time. This can be done with Counts functionality. The nodeCount metric is displayed by each node.

Of course, retrieving the results depends on metric setup. Results are retrieved using CountsReporterCreator trait. Nussknacker provides InfluxCountsReporterCreator (details below), different implementation may be provided via ServiceLoader mechanism. It's important to remember that implementation has to be on main Nussknacker classpath (not in model jars).

Please note that in most cases results will not be exact, as Flink reports metrics every couple of seconds Also, when Flink job is restarted, the metrics are reset.

InfluxCountsReporter

This CountsReporter implementation is based on InfluxDB metrics, stored in nodeCount measurement by default. QueryMode setting can be used to choose metric computation algorithm:

  • OnlySingleDifference - subtracts values between end and beginning of requested range. Fast method, but if restart in the requested time range is detected error is returned. We assume the job was restarted when event counter at the source decreases.
  • OnlySumOfDifferences - difference is computed by summing differences in measurements for requested time range. This method works a bit better for restart situations, but can be slow for large diagrams and wide time ranges.
  • SumOfDifferencesForRestarts - if restart is detected, the metrics are computed with OnlySumDifferences, otherwis - with OnlySingleDifferences