metrics

package
v1.2.0-rc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LabelPipeline           = "pipeline"
	LabelVertex             = "vertex"
	LabelVertexReplicaIndex = "replica"
	LabelVertexType         = "vertex_type"
	LabelPartitionName      = "partition_name"

	LabelReason = "reason"
)
View Source
const (
	VertexPendingMessages = "vertex_pending_messages"
	LabelPeriod           = "period"
)

Variables

View Source
var (
	// ReadMessagesCount is used to indicate the number of total messages read
	ReadMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "read_total",
		Help:      "Total number of Messages Read",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})

	// ReadDataMessagesCount is used to indicate the number of data messages read
	ReadDataMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "data_read_total",
		Help:      "Total number of Data Messages Read",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})

	// ReadBytesCount is to indicate the number of bytes read
	ReadBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "read_bytes_total",
		Help:      "Total number of bytes read",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})

	// ReadMessagesError is used to indicate the number of errors messages read
	ReadMessagesError = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "read_error_total",
		Help:      "Total number of Read Errors",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})

	// WriteMessagesCount is used to indicate the number of messages written
	WriteMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "write_total",
		Help:      "Total number of Messages Written",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})

	// WriteBytesCount is to indicate the number of bytes written
	WriteBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "write_bytes_total",
		Help:      "Total number of bytes written",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})

	// WriteMessagesError is used to indicate the number of errors messages written
	WriteMessagesError = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "write_error_total",
		Help:      "Total number of Write Errors",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})

	// DropMessagesCount is used to indicate the number of messages dropped
	DropMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "drop_total",
		Help:      "Total number of Messages Dropped",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName, LabelReason})

	// DropBytesCount is to indicate the number of bytes dropped
	DropBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "drop_bytes_total",
		Help:      "Total number of Bytes Dropped",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName, LabelReason})

	// AckMessagesCount is used to indicate the number of  messages acknowledged
	AckMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "ack_total",
		Help:      "Total number of Messages Acknowledged",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})

	// AckMessageError is used to indicate the errors in the number of  messages acknowledged
	AckMessageError = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "ack_error_total",
		Help:      "Total number of Acknowledged Errors",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})

	// UDFError is used to indicate the number of UDF errors
	UDFError = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "udf_error_total",
		Help:      "Total number of UDF Errors",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex})

	// PlatformError is used to indicate the number of Internal/Platform errors
	PlatformError = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "platform_error_total",
		Help:      "Total number of platform Errors",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex})

	// ForwardAChunkProcessingTime is a histogram to Observe forwardAChunk Processing times as a whole
	ForwardAChunkProcessingTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
		Subsystem: "forwarder",
		Name:      "forward_chunk_processing_time",
		Help:      "Processing times of the entire forward a chunk (100 microseconds to 20 minutes)",
		Buckets:   prometheus.ExponentialBucketsRange(100, 60000000*20, 10),
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex})

	// UDFProcessingTime is a histogram to Observe UDF Processing times as a whole
	UDFProcessingTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
		Subsystem: "forwarder",
		Name:      "udf_processing_time",
		Help:      "Processing times of UDF (100 microseconds to 15 minutes)",
		Buckets:   prometheus.ExponentialBucketsRange(100, 60000000*15, 10),
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex})

	// ConcurrentUDFProcessingTime is a histogram to Observe UDF Processing times as a whole
	ConcurrentUDFProcessingTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
		Subsystem: "forwarder",
		Name:      "concurrent_udf_processing_time",
		Help:      "Processing times of Concurrent UDF (100 microseconds to 20 minutes)",
		Buckets:   prometheus.ExponentialBucketsRange(100, 60000000*20, 10),
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex})

	// UDFReadMessagesCount is used to indicate the number of messages read by UDF
	UDFReadMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "udf_read_total",
		Help:      "Total number of Messages Read by UDF",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})

	// UDFWriteMessagesCount is used to indicate the number of messages written by UDF
	UDFWriteMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "forwarder",
		Name:      "udf_write_total",
		Help:      "Total number of Messages Written by UDF",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})
)

Generic forwarder metrics

View Source
var (
	// SourceTransformerError is used to indicate the number of source transformer errors
	SourceTransformerError = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "source_forwarder",
		Name:      "transformer_error_total",
		Help:      "Total number of source transformer Errors",
	}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex, LabelPartitionName})

	// SourceTransformerProcessingTime is a histogram to Observe Source Transformer Processing times as a whole
	SourceTransformerProcessingTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
		Subsystem: "source_forwarder",
		Name:      "transformer_processing_time",
		Help:      "Processing times of source transformer (100 microseconds to 15 minutes)",
		Buckets:   prometheus.ExponentialBucketsRange(100, 60000000*15, 10),
	}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex, LabelPartitionName})

	// SourceTransformerConcurrentProcessingTime is a histogram to Observe Source Transformer Processing times as a whole
	SourceTransformerConcurrentProcessingTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
		Subsystem: "source_forwarder",
		Name:      "concurrent_transformer_processing_time",
		Help:      "Processing times of Concurrent source transformer (100 microseconds to 20 minutes)",
		Buckets:   prometheus.ExponentialBucketsRange(100, 60000000*20, 10),
	}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex, LabelPartitionName})

	// SourceTransformerReadMessagesCount is used to indicate the number of messages read by source transformer
	SourceTransformerReadMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "source_forwarder",
		Name:      "transformer_read_total",
		Help:      "Total number of Messages Read by source transformer",
	}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex, LabelPartitionName})

	// SourceTransformerWriteMessagesCount is used to indicate the number of messages written by source transformer
	SourceTransformerWriteMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "source_forwarder",
		Name:      "transformer_write_total",
		Help:      "Total number of Messages Written by source transformer",
	}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex, LabelPartitionName})
)

Source forwarder specific metrics

View Source
var (
	// ReduceDroppedMessagesCount is used to indicate the number of messages dropped
	ReduceDroppedMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "reduce_data_forward",
		Name:      "dropped_total",
		Help:      "Total number of Messages Dropped",
	}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex, LabelReason})

	// PBQWriteErrorCount is used to indicate the number of errors while writing to pbq
	PBQWriteErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "reduce_pbq",
		Name:      "write_error_total",
		Help:      "Total number of PBQ Write Errors",
	}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex})

	// PBQWriteMessagesCount is used to indicate the number of messages written to pbq
	PBQWriteMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "reduce_pbq",
		Name:      "write_total",
		Help:      "Total number of Messages Written to PBQ",
	}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex})

	// PBQWriteTime pbq write latency
	PBQWriteTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
		Subsystem: "reduce_pbq",
		Name:      "write_time",
		Help:      "Entry write time (1 to 5000 microseconds)",
		Buckets:   prometheus.ExponentialBucketsRange(1, 5000, 5),
	}, []string{LabelPipeline, LabelVertex, LabelVertexReplicaIndex})

	// ReduceProcessTime reduce ForwardTask processing latency
	ReduceProcessTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
		Subsystem: "reduce_pnf",
		Name:      "process_time",
		Help:      "Reduce process time (1 to 1200000 milliseconds)",
		Buckets:   prometheus.ExponentialBucketsRange(1, 1200000, 5),
	}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex})

	// ReduceForwardTime is used to indicate the time it took to forward the writeMessages
	ReduceForwardTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
		Subsystem: "reduce_pnf",
		Name:      "forward_time",
		Help:      "Reduce forward time (1 to 100000 microseconds)",
		Buckets:   prometheus.ExponentialBucketsRange(1, 100000, 5),
	}, []string{LabelPipeline, LabelVertex, LabelVertexReplicaIndex})

	// ReducePartitionsInFlight is used to indicate the partitions in flight
	ReducePartitionsInFlight = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "reduce_pnf",
		Name:      "partitions_inflight",
		Help:      "Total number of partitions in flight",
	}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex})

	// ActiveWindowsCount is used to indicate the number of active windows
	ActiveWindowsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "reduce",
		Name:      "active_windows",
		Help:      "Total number of active windows",
	}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex})

	// ClosedWindowsCount is used to indicate the number of closed windows
	ClosedWindowsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "reduce",
		Name:      "closed_windows",
		Help:      "Total number of closed windows",
	}, []string{LabelVertex, LabelPipeline, LabelVertexReplicaIndex})
)

Reduce forwarder specific metrics

View Source
var (
	// CtrlMessagesCount is used to indicate the number of total ctrl messages sent.
	CtrlMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Subsystem: "idlemanager",
		Name:      "ctrl_msg_total",
		Help:      "Total number of ctrl Messages sent",
	}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})
)

Ctrl Message Metric

Functions

func NewMetricsServer added in v0.5.3

func NewMetricsServer(vertex *dfv1.Vertex, opts ...Option) *metricsServer

NewMetricsServer returns a Prometheus metrics server instance, which can be used to start an HTTPS service to expose Prometheus metrics.

Types

type HealthChecker added in v0.7.1

type HealthChecker interface {
	// IsHealthy checks if the user defined container is healthy
	IsHealthy(ctx context.Context) error
}

HealthChecker is the interface to check if the user defined container is connected and ready to use

type Option added in v0.5.3

type Option func(*metricsServer)

func NewMetricsOptions added in v0.7.1

func NewMetricsOptions(ctx context.Context, vertex *dfv1.Vertex, healthCheckers []HealthChecker, readers []isb.LagReader) []Option

NewMetricsOptions returns a metrics option list.

func WithHealthCheckExecutor added in v0.5.6

func WithHealthCheckExecutor(f func() error) Option

WithHealthCheckExecutor appends a health check executor

func WithLagReaders added in v0.9.0

func WithLagReaders(r map[string]isb.LagReader) Option

WithLagReaders sets the lag readers

func WithLookbackSeconds added in v0.5.3

func WithLookbackSeconds(seconds int64) Option

WithLookbackSeconds sets lookback seconds for pending calculation

func WithRefreshInterval added in v0.5.3

func WithRefreshInterval(d time.Duration) Option

WithRefreshInterval sets how often to refresh the pending information

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL