Documentation ¶
Index ¶
- Constants
- func All[T any](xs []T, pred func(T) bool) bool
- func CopyAll[T Copyable[T]](xs []T) []T
- func CopyAllErr[T CopyableErr[T]](xs []T) ([]T, error)
- func RecvOrTimeout[T any](c <-chan T, timeout time.Duration) (*T, error)
- func Reduce[T any, V any, S []V](s S, f Reducer[T, V]) T
- func SendOrQuit[T any, Q any](c chan<- T, msg T, quit chan Q) bool
- type ConcurrentQueue
- type ContextGuard
- func (g *ContextGuard) CtxBlocking() (context.Context, func())
- func (g *ContextGuard) CtxBlockingCustomTimeout(timeout time.Duration) (context.Context, func())
- func (g *ContextGuard) WithCtxQuit() (context.Context, func())
- func (g *ContextGuard) WithCtxQuitCustomTimeout(timeout time.Duration) (context.Context, func())
- func (g *ContextGuard) WithCtxQuitNoTimeout() (context.Context, func())
- type Copyable
- type CopyableErr
- type EventPublisher
- type EventReceiver
- type Reducer
Constants ¶
const (
// DefaultQueueSize is the default size to use for concurrent queues.
DefaultQueueSize = 10
)
Variables ¶
This section is empty.
Functions ¶
func CopyAll ¶
func CopyAll[T Copyable[T]](xs []T) []T
CopyAll creates a new slice where each item of the slice is a deep copy of the elements of the input slice.
func CopyAllErr ¶
func CopyAllErr[T CopyableErr[T]](xs []T) ([]T, error)
CopyAll creates a new slice where each item of the slice is a deep copy of the elements of the input slice. This is identical to CopyAll, but shuold be used in cases where the copy method can return an error.
func RecvOrTimeout ¶
RecvOrTimeout attempts to recv over chan c, returning the value. If the timeout passes before the recv succeeds, an error is returned
func Reduce ¶
Reduce takes a slice of something, and a reducer, and produces a final accumulated value.
func SendOrQuit ¶
SendOrQuit attempts to and a message through channel c. If this succeeds, then bool is returned. Otherwise if a quit signal is received first, then false is returned.
Types ¶
type ConcurrentQueue ¶
type ConcurrentQueue[T any] struct { // contains filtered or unexported fields }
ConcurrentQueue is a typed concurrent-safe FIFO queue with unbounded capacity. Clients interact with the queue by pushing items into the in channel and popping items from the out channel. There is a goroutine that manages moving items from the in channel to the out channel in the correct order that must be started by calling Start().
func NewConcurrentQueue ¶
func NewConcurrentQueue[T any](bufferSize int) *ConcurrentQueue[T]
NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is the capacity of the output channel. When the size of the queue is below this threshold, pushes do not incur the overhead of the less efficient overflow structure.
func (*ConcurrentQueue[T]) ChanIn ¶
func (cq *ConcurrentQueue[T]) ChanIn() chan<- T
ChanIn returns a channel that can be used to push new items into the queue.
func (*ConcurrentQueue[T]) ChanOut ¶
func (cq *ConcurrentQueue[T]) ChanOut() <-chan T
ChanOut returns a channel that can be used to pop items from the queue.
func (*ConcurrentQueue[T]) Start ¶
func (cq *ConcurrentQueue[T]) Start()
Start begins a goroutine that manages moving items from the in channel to the out channel. The queue tries to move items directly to the out channel minimize overhead, but if the out channel is full it pushes items to an overflow queue. This must be called before using the queue.
func (*ConcurrentQueue[T]) Stop ¶
func (cq *ConcurrentQueue[T]) Stop()
Stop ends the goroutine that moves items from the in channel to the out channel. This does not clear the queue state, so the queue can be restarted without dropping items.
type ContextGuard ¶
ContextGuard is an embeddable struct that provides a wait group and main quit channel that can be used to create guarded contexts.
func (*ContextGuard) CtxBlocking ¶
func (g *ContextGuard) CtxBlocking() (context.Context, func())
CtxBlocking is used to create a cancellable context that will NOT be cancelled if the main quit signal is triggered, to block shutdown of important tasks. The context will be cancelled if the timeout is reached.
func (*ContextGuard) CtxBlockingCustomTimeout ¶
func (g *ContextGuard) CtxBlockingCustomTimeout( timeout time.Duration) (context.Context, func())
CtxBlockingCustomTimeout is used to create a cancellable context with a custom timeout that will NOT be cancelled if the main quit signal is triggered, to block shutdown of important tasks. The context will be cancelled if the timeout is reached.
func (*ContextGuard) WithCtxQuit ¶
func (g *ContextGuard) WithCtxQuit() (context.Context, func())
WithCtxQuit is used to create a cancellable context that will be cancelled if the main quit signal is triggered or after the default timeout occurred.
func (*ContextGuard) WithCtxQuitCustomTimeout ¶
func (g *ContextGuard) WithCtxQuitCustomTimeout( timeout time.Duration) (context.Context, func())
WithCtxQuitCustomTimeout is used to create a cancellable context that will be cancelled if the main quit signal is triggered or after the given timeout occurred.
func (*ContextGuard) WithCtxQuitNoTimeout ¶
func (g *ContextGuard) WithCtxQuitNoTimeout() (context.Context, func())
WithCtxQuitNoTimeout is used to create a cancellable context that will be cancelled if the main quit signal is triggered.
type Copyable ¶
type Copyable[T any] interface { Copy() T }
Copyable is a generic interface for a type that's able to return a deep copy of itself.
type CopyableErr ¶
CopyableErr is a generic interface for a type that's able to return a deep copy of itself. This is identical to Copyable, but shuold be used in cases where the copy method can return an error.
type EventPublisher ¶
type EventPublisher[T any, Q any] interface { // RegisterSubscriber adds a new subscriber for receiving events. The // deliverExisting boolean indicates whether already existing items // should be sent to the NewItemCreated channel when the subscription is // started. An optional deliverFrom can be specified to indicate from // which timestamp/index/marker onward existing items should be // delivered on startup. If deliverFrom is nil/zero/empty then all // existing items will be delivered. RegisterSubscriber(receiver *EventReceiver[T], deliverExisting bool, deliverFrom Q) error // RemoveSubscriber removes the given subscriber and also stops it from // processing events. RemoveSubscriber(subscriber *EventReceiver[T]) error }
EventPublisher is an interface type for a component that offers event based subscriptions for publishing events.
type EventReceiver ¶
type EventReceiver[T any] struct { // NewItemCreated is sent to when a new item was created successfully. NewItemCreated *ConcurrentQueue[T] // ItemRemoved is sent to when an existing item was removed. ItemRemoved *ConcurrentQueue[T] // contains filtered or unexported fields }
EventReceiver is a struct type that holds two queues for new and removed items respectively.
func NewEventReceiver ¶
func NewEventReceiver[T any](queueSize int) *EventReceiver[T]
NewEventReceiver creates a new event receiver with concurrent queues of the given size.
func (*EventReceiver[T]) ID ¶
func (e *EventReceiver[T]) ID() uint64
ID returns the internal process-unique ID of the subscription.
func (*EventReceiver[T]) Stop ¶
func (e *EventReceiver[T]) Stop()
Stop stops the receiver from processing events.