Documentation ¶
Overview ¶
Package channelmetrics provides a flexible way to wrap Go channels with additional metrics collection capabilities. This allows for monitoring and tracking of channel usage and performance using different metrics backends.
Index ¶
- type MetricsCollector
- type ObservableChan
- func (oc *ObservableChan[T]) Close()
- func (oc *ObservableChan[T]) RecordChannelCapacity()
- func (oc *ObservableChan[T]) RecordChannelLen()
- func (oc *ObservableChan[T]) Recv() T
- func (oc *ObservableChan[T]) RecvCtx(ctx context.Context) (T, error)
- func (oc *ObservableChan[T]) Send(item T)
- func (oc *ObservableChan[T]) SendCtx(ctx context.Context, item T) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MetricsCollector ¶
type MetricsCollector interface { RecordProduceDuration(duration time.Duration) RecordConsumeDuration(duration time.Duration) RecordChannelLen(size int) RecordChannelCap(capacity int) }
MetricsCollector is an interface for collecting metrics. Implementations of this interface can be used to record various channel metrics.
type ObservableChan ¶
type ObservableChan[T any] struct { // contains filtered or unexported fields }
ObservableChan wraps a Go channel and collects metrics about its usage. It supports any type of channel and records metrics using a provided MetricsCollector implementation.
func NewObservableChan ¶
func NewObservableChan[T any](ch chan T, metrics MetricsCollector) *ObservableChan[T]
NewObservableChan creates a new ObservableChan wrapping the provided channel. It records the channel's capacity immediately and sets up metrics collection using the provided MetricsCollector and channel name. The chanName is used to distinguish between metrics for different channels by incorporating it into the metric names.
func (*ObservableChan[T]) Close ¶
func (oc *ObservableChan[T]) Close()
Close closes the channel and records the current size of the channel buffer.
func (*ObservableChan[T]) RecordChannelCapacity ¶
func (oc *ObservableChan[T]) RecordChannelCapacity()
RecordChannelCapacity records the capacity of the channel buffer.
func (*ObservableChan[T]) RecordChannelLen ¶
func (oc *ObservableChan[T]) RecordChannelLen()
RecordChannelLen records the current size of the channel buffer.
func (*ObservableChan[T]) Recv ¶
func (oc *ObservableChan[T]) Recv() T
Recv receives an item from the channel and records the duration taken to do so. It also updates the current size of the channel buffer. This method blocks until an item is available.
func (*ObservableChan[T]) RecvCtx ¶
func (oc *ObservableChan[T]) RecvCtx(ctx context.Context) (T, error)
RecvCtx receives an item from the channel with context and records the duration taken to do so. It also updates the current size of the channel buffer and supports context cancellation. If an error occurs, it logs the error.
func (*ObservableChan[T]) Send ¶
func (oc *ObservableChan[T]) Send(item T)
Send sends an item into the channel and records the duration taken to do so. It also updates the current size of the channel buffer. This method blocks until the item is sent.