channelmetrics

package
v3.81.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2024 License: AGPL-3.0 Imports: 3 Imported by: 0

Documentation

Overview

Package channelmetrics provides a flexible way to wrap Go channels with additional metrics collection capabilities. This allows for monitoring and tracking of channel usage and performance using different metrics backends.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MetricsCollector

type MetricsCollector interface {
	RecordProduceDuration(duration time.Duration)
	RecordConsumeDuration(duration time.Duration)
	RecordChannelLen(size int)
	RecordChannelCap(capacity int)
}

MetricsCollector is an interface for collecting metrics. Implementations of this interface can be used to record various channel metrics.

type ObservableChan

type ObservableChan[T any] struct {
	// contains filtered or unexported fields
}

ObservableChan wraps a Go channel and collects metrics about its usage. It supports any type of channel and records metrics using a provided MetricsCollector implementation.

func NewObservableChan

func NewObservableChan[T any](ch chan T, metrics MetricsCollector) *ObservableChan[T]

NewObservableChan creates a new ObservableChan wrapping the provided channel. It records the channel's capacity immediately and sets up metrics collection using the provided MetricsCollector and channel name. The chanName is used to distinguish between metrics for different channels by incorporating it into the metric names.

func (*ObservableChan[T]) Close

func (oc *ObservableChan[T]) Close()

Close closes the channel and records the current size of the channel buffer.

func (*ObservableChan[T]) RecordChannelCapacity

func (oc *ObservableChan[T]) RecordChannelCapacity()

RecordChannelCapacity records the capacity of the channel buffer.

func (*ObservableChan[T]) RecordChannelLen

func (oc *ObservableChan[T]) RecordChannelLen()

RecordChannelLen records the current size of the channel buffer.

func (*ObservableChan[T]) Recv

func (oc *ObservableChan[T]) Recv() T

Recv receives an item from the channel and records the duration taken to do so. It also updates the current size of the channel buffer. This method blocks until an item is available.

func (*ObservableChan[T]) RecvCtx

func (oc *ObservableChan[T]) RecvCtx(ctx context.Context) (T, error)

RecvCtx receives an item from the channel with context and records the duration taken to do so. It also updates the current size of the channel buffer and supports context cancellation. If an error occurs, it logs the error.

func (*ObservableChan[T]) Send

func (oc *ObservableChan[T]) Send(item T)

Send sends an item into the channel and records the duration taken to do so. It also updates the current size of the channel buffer. This method blocks until the item is sent.

func (*ObservableChan[T]) SendCtx

func (oc *ObservableChan[T]) SendCtx(ctx context.Context, item T) error

SendCtx sends an item into the channel with context and records the duration taken to do so. It also updates the current size of the channel buffer and supports context cancellation.

Directories

Path Synopsis
metrics_collector

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL