Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrBufferClosed = errors.New("fanout buffer closed")
ErrBufferClosed is an error indicating that the event buffer as a whole has been closed.
var ErrGracePeriodExceeded = errors.New("failed to process fanout buffer backlog within grace period")
ErrGracePeriodExceeded is an error returned by Cursor.Read indicating that the cursor fell too far behind. Observing this error indicates that either the reader is too slow, or the buffer has been configured with insufficient capacity/backlog for the usecase.
var ErrUseOfClosedCursor = errors.New("use of closed fanout buffer cursor (this is a bug)")
ErrUseOfClosedCursor is an error indicating that Cursor.Read was called after the cursor had either been explicitly closed, or had previously returned an error.
Functions ¶
This section is empty.
Types ¶
type Buffer ¶
type Buffer[T any] struct { // contains filtered or unexported fields }
Buffer is a circular buffer that keeps track of how many cursors exist, and how many have seen each item, so that it knows when items can be cleared. If one or more cursors fall behind, the items they have yet to see go into a temporary backlog of infinite size. If the backlog persists for greater than the allowed grace period, it is cleared and all cursors still in the backlog fail on their next attempt to read.
func (*Buffer[T]) Append ¶
func (b *Buffer[T]) Append(items ...T)
Append appends to the buffer and wakes all dormant cursors.
type Config ¶
type Config struct { // Capacity is the capacity to allocate for the main circular buffer. Capacity should be selected s.t. cursors rarely // fall behind capacity during normal operation. Capacity uint64 // GracePeriod is the amount of time a backlog (beyond the specified capacity) will be allowed to persist. Longer grace // periods give cursors more time to catch up in the event of spikes, at the cost of higher potential memory usage and // longer waits before unhealthy cursors are ejected. GracePeriod time.Duration // Clock is used to override default time-behaviors in tests. Clock clockwork.Clock }
Config holds all configuration parameters for the fanout buffer. All parameters are optional.
func (*Config) SetDefaults ¶
func (c *Config) SetDefaults()
SetDefaults sets default config parameters.
type Cursor ¶
type Cursor[T any] struct { // contains filtered or unexported fields }
Cursor is a cursor into a fanout buffer. Cursor's *must* be closed if not being actively read to avoid buffer performance degredation. Cursors are not intended for concurrent use (though they are "safe", concurrent calls may block longer than expected due to the lock being held across blocking reads).
func (*Cursor[T]) Close ¶
Close closes the cursor. Close is safe to double-call and should be called as soon as possible if the cursor is no longer in use.