fn

package
v0.3.2-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2023 License: MIT Imports: 12 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// DefaultQueueSize is the default size to use for concurrent queues.
	DefaultQueueSize = 10
)

Variables

This section is empty.

Functions

func All

func All[T any](xs []T, pred func(T) bool) bool

All returns true if the passed predicate returns true for all items in the slice.

func Any

func Any[T any](xs []T, pred func(T) bool) bool

Any returns true if the passed predicate returns true for any item in the slice.

func ByteSlice

func ByteSlice[T ByteArray](v T) []byte

ByteSlice takes a byte array, and returns a slice. This is useful when a function returns an array, but a slice is wanted. Without this, then an intermediate variable is needed.

func Collect

func Collect[T any](c chan T) []T

Collect receives all values from a channel and returns them as a slice.

NOTE: This function closes the channel to be able to collect all items at once.

func CollectBatch added in v0.3.0

func CollectBatch[V any](ctx context.Context, values <-chan V,
	batchSize int, cb func(ctx context.Context, batch []V) error) error

CollectBatch reads from the given channel and returns batchSize items at a time and a boolean that indicates whether we expect more items to be sent on the channel. If the context is canceled, the function returns the items that have been read so far and the context's error.

NOTE: The channel MUST be closed for this function to return.

func CopyAll

func CopyAll[T Copyable[T]](xs []T) []T

CopyAll creates a new slice where each item of the slice is a deep copy of the elements of the input slice.

func CopyAllErr

func CopyAllErr[T CopyableErr[T]](xs []T) ([]T, error)

CopyAllErr creates a new slice where each item of the slice is a deep copy of the elements of the input slice. This is identical to CopyAll, but shuold be used in cases where the copy method can return an error.

func CopySlice

func CopySlice[T any](slice []T) []T

CopySlice returns a copy of the given slice. Does a shallow copy of the slice itself, not the underlying elements.

func Count

func Count[T any](xs []T, pred func(T) bool) int

Count returns the number of items in the slice that match the predicate.

func Enumerate

func Enumerate[T any](items []T, f func(int, T))

Enumerate is a generic enumeration function. The closure will be called for each item in the passed slice, receiving both the index number as well as the item itself.

func Filter

func Filter[T any](s []T, f func(T) bool) []T

Filter applies the given predicate function to each element of the given slice and generates a new slice containing only the elements for which the predicate returned true.

func FilterMap added in v0.3.1

func FilterMap[T any, K comparable](s map[K]T, f func(T) bool) []T

FilterMap applies the given predicate function to each element of the given map and generates a new slice containing only the elements for which the predicate returned true.

func First

func First[T any](xs []*T, pred func(*T) bool) (*T, error)

First returns the first item in the slice that matches the predicate, or an error if none matches.

func ForEach

func ForEach[T any](items []T, f func(T))

ForEach is a generic implementation of a for-each (map with side effects). This can be used to ensure that any normal for-loop don't run into bugs due to loop variable scoping.

func ForEachErr

func ForEachErr[T any](s []T, f func(T) error) error

ForEachErr will iterate through all items in the passed slice, calling the function f on each slice. If a call to f fails, then the function returns an error immediately.

This function can be used instead of the normal range loop to ensure that a loop scoping bug isn't introduced.

func IsCanceled

func IsCanceled(err error) bool

IsCanceled returns true if the passed error is a gRPC error with the context.Canceled error as the cause.

func MakeSlice added in v0.3.0

func MakeSlice[T any](items ...T) []T

MakeSlice is a generic function shorthand for making a slice out of a set of elements. This can be used to avoid having to specify the type of the slice as well as the types of the elements.

func Map

func Map[I, O any, S []I](s S, f func(I) O) []O

Map applies the given mapping function to each element of the given slice and generates a new slice.

func MapErr

func MapErr[I, O any, S []I](s S, f func(I) (O, error)) ([]O, error)

MapErr applies the given fallible mapping function to each element of the given slice and generates a new slice. This is identical to Map, but returns early if any single mapping fails.

func None

func None[T any](xs []T, pred func(T) bool) bool

None returns true if the passed predicate returns false for all items in the slice.

func ParSlice

func ParSlice[V any](ctx context.Context, s []V, f ErrFunc[V]) error

ParSlice can be used to execute a function on each element of a slice in parallel. This function is fully blocking and will wait for all goroutines to either succeed, or for the first to error out. Active goroutines limited with number of CPU. Context will be passed in executable func and canceled the first time a function passed returns a non-nil error. Returns the first non-nil error (if any).

func Ptr

func Ptr[T any](v T) *T

Ptr returns the pointer of the given value. This is useful in instances where a function returns the value, but a pointer is wanted. Without this, then an intermediate variable is needed.

func RecvOrTimeout

func RecvOrTimeout[T any](c <-chan T, timeout time.Duration) (*T, error)

RecvOrTimeout attempts to recv over chan c, returning the value. If the timeout passes before the recv succeeds, an error is returned

func RecvResp

func RecvResp[T any](r <-chan T, e <-chan error, q <-chan struct{}) (T, error)

RecvResp takes three channels: a response channel, an error channel and a quit channel. If either of these channels are sent on, then the function will exit with that response. This can be used to wait for a response, error, or a quit signal.

func Reduce

func Reduce[T any, V any, S []V](s S, f Reducer[T, V]) T

Reduce takes a slice of something, and a reducer, and produces a final accumulated value.

func SendAll

func SendAll[T any](c chan<- T, msgs ...T)

SendAll attempts to send all messages through channel c.

TODO(roasbeef): add non-blocking variant?

func SendOrQuit

func SendOrQuit[T any, Q any](c chan<- T, msg T, quit chan Q) bool

SendOrQuit attempts to and a message through channel c. If this succeeds, then bool is returned. Otherwise if a quit signal is received first, then false is returned.

func SetDiff

func SetDiff[T comparable](a, b []T) []T

SetDiff returns all the items that are in the first set but not in the second.

func ToArray

func ToArray[T ByteArray](v []byte) T

ToArray takes a byte slice, and returns an array. This is useful when a fixed sized array is needed and the byte slice is known to be of the correct size.

Types

type ByteArray

type ByteArray interface {
	~[32]byte
}

ByteArray is a type constraint for type that reduces down to a fixed sized array.

type ConcurrentQueue

type ConcurrentQueue[T any] struct {
	// contains filtered or unexported fields
}

ConcurrentQueue is a typed concurrent-safe FIFO queue with unbounded capacity. Clients interact with the queue by pushing items into the in channel and popping items from the out channel. There is a goroutine that manages moving items from the in channel to the out channel in the correct order that must be started by calling Start().

func NewConcurrentQueue

func NewConcurrentQueue[T any](bufferSize int) *ConcurrentQueue[T]

NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is the capacity of the output channel. When the size of the queue is below this threshold, pushes do not incur the overhead of the less efficient overflow structure.

func (*ConcurrentQueue[T]) ChanIn

func (cq *ConcurrentQueue[T]) ChanIn() chan<- T

ChanIn returns a channel that can be used to push new items into the queue.

func (*ConcurrentQueue[T]) ChanOut

func (cq *ConcurrentQueue[T]) ChanOut() <-chan T

ChanOut returns a channel that can be used to pop items from the queue.

func (*ConcurrentQueue[T]) Start

func (cq *ConcurrentQueue[T]) Start()

Start begins a goroutine that manages moving items from the in channel to the out channel. The queue tries to move items directly to the out channel minimize overhead, but if the out channel is full it pushes items to an overflow queue. This must be called before using the queue.

func (*ConcurrentQueue[T]) Stop

func (cq *ConcurrentQueue[T]) Stop()

Stop ends the goroutine that moves items from the in channel to the out channel. This does not clear the queue state, so the queue can be restarted without dropping items.

type ContextGuard

type ContextGuard struct {
	DefaultTimeout time.Duration
	Wg             sync.WaitGroup
	Quit           chan struct{}
}

ContextGuard is an embeddable struct that provides a wait group and main quit channel that can be used to create guarded contexts.

func (*ContextGuard) CtxBlocking

func (g *ContextGuard) CtxBlocking() (context.Context, func())

CtxBlocking is used to create a cancellable context that will NOT be cancelled if the main quit signal is triggered, to block shutdown of important tasks. The context will be cancelled if the timeout is reached.

func (*ContextGuard) CtxBlockingCustomTimeout

func (g *ContextGuard) CtxBlockingCustomTimeout(
	timeout time.Duration) (context.Context, func())

CtxBlockingCustomTimeout is used to create a cancellable context with a custom timeout that will NOT be cancelled if the main quit signal is triggered, to block shutdown of important tasks. The context will be cancelled if the timeout is reached.

func (*ContextGuard) WithCtxQuit

func (g *ContextGuard) WithCtxQuit() (context.Context, func())

WithCtxQuit is used to create a cancellable context that will be cancelled if the main quit signal is triggered or after the default timeout occurred.

func (*ContextGuard) WithCtxQuitCustomTimeout

func (g *ContextGuard) WithCtxQuitCustomTimeout(
	timeout time.Duration) (context.Context, func())

WithCtxQuitCustomTimeout is used to create a cancellable context that will be cancelled if the main quit signal is triggered or after the given timeout occurred.

func (*ContextGuard) WithCtxQuitNoTimeout

func (g *ContextGuard) WithCtxQuitNoTimeout() (context.Context, func())

WithCtxQuitNoTimeout is used to create a cancellable context that will be cancelled if the main quit signal is triggered.

type Copyable

type Copyable[T any] interface {
	Copy() T
}

Copyable is a generic interface for a type that's able to return a deep copy of itself.

type CopyableErr

type CopyableErr[T any] interface {
	Copy() (T, error)
}

CopyableErr is a generic interface for a type that's able to return a deep copy of itself. This is identical to Copyable, but shuold be used in cases where the copy method can return an error.

type ErrFunc

type ErrFunc[V any] func(context.Context, V) error

ErrFunc is a type def for a function that takes a context (to allow early cancellation) and a series of value returning an error. This is typically used a closure to perform concurrent work over a homogeneous slice of values.

type Event

type Event interface {
	Timestamp() time.Time
}

Event is a generic event that can be sent to a subscriber.

type EventDistributor

type EventDistributor[T any] struct {
	// contains filtered or unexported fields
}

EventDistributor is a struct type that helps to distribute events to multiple subscribers.

func NewEventDistributor

func NewEventDistributor[T any]() *EventDistributor[T]

NewEventDistributor creates a new event distributor of the declared type.

func (*EventDistributor[T]) NotifySubscribers

func (d *EventDistributor[T]) NotifySubscribers(events ...T)

NotifySubscribers sends the given events to all subscribers.

func (*EventDistributor[T]) RegisterSubscriber

func (d *EventDistributor[T]) RegisterSubscriber(subscriber *EventReceiver[T])

RegisterSubscriber adds a new subscriber for receiving events.

func (*EventDistributor[T]) RemoveSubscriber

func (d *EventDistributor[T]) RemoveSubscriber(
	subscriber *EventReceiver[T]) error

RemoveSubscriber removes the given subscriber and also stops it from processing events.

type EventPublisher

type EventPublisher[T any, Q any] interface {
	// RegisterSubscriber adds a new subscriber for receiving events. The
	// deliverExisting boolean indicates whether already existing items
	// should be sent to the NewItemCreated channel when the subscription is
	// started. An optional deliverFrom can be specified to indicate from
	// which timestamp/index/marker onward existing items should be
	// delivered on startup. If deliverFrom is nil/zero/empty then all
	// existing items will be delivered.
	RegisterSubscriber(receiver *EventReceiver[T], deliverExisting bool,
		deliverFrom Q) error

	// RemoveSubscriber removes the given subscriber and also stops it from
	// processing events.
	RemoveSubscriber(subscriber *EventReceiver[T]) error
}

EventPublisher is an interface type for a component that offers event based subscriptions for publishing events.

type EventReceiver

type EventReceiver[T any] struct {

	// NewItemCreated is sent to when a new item was created successfully.
	NewItemCreated *ConcurrentQueue[T]

	// ItemRemoved is sent to when an existing item was removed.
	ItemRemoved *ConcurrentQueue[T]
	// contains filtered or unexported fields
}

EventReceiver is a struct type that holds two queues for new and removed items respectively.

func NewEventReceiver

func NewEventReceiver[T any](queueSize int) *EventReceiver[T]

NewEventReceiver creates a new event receiver with concurrent queues of the given size.

func (*EventReceiver[T]) ID

func (e *EventReceiver[T]) ID() uint64

ID returns the internal process-unique ID of the subscription.

func (*EventReceiver[T]) Stop

func (e *EventReceiver[T]) Stop()

Stop stops the receiver from processing events.

type Reducer

type Reducer[T, V any] func(accum T, value V) T

Reducer represents a function that takes an accumulator and the value, then returns a new accumulator.

type Set

type Set[T comparable] map[T]struct{}

Set is a generic set using type params that supports the following operations: diff, union, intersection, and subset.

func NewSet

func NewSet[T comparable](elems ...T) Set[T]

NewSet returns a new set with the given elements.

func (Set[T]) Add

func (s Set[T]) Add(e T)

Add adds an element to the set.

func (Set[T]) Contains

func (s Set[T]) Contains(e T) bool

Contains returns true if the set contains the element.

func (Set[T]) Diff

func (s Set[T]) Diff(other Set[T]) Set[T]

Diff returns the difference between two sets.

func (Set[T]) Equal

func (s Set[T]) Equal(other Set[T]) bool

Equal returns true if the set is equal to the other set.

func (Set[T]) Intersect

func (s Set[T]) Intersect(other Set[T]) Set[T]

Intersect returns the intersection of two sets.

func (Set[T]) Remove

func (s Set[T]) Remove(e T)

Remove removes an element from the set.

func (Set[T]) Subset

func (s Set[T]) Subset(other Set[T]) bool

Subset returns true if the set is a subset of the other set.

func (Set[T]) ToSlice

func (s Set[T]) ToSlice() []T

ToSlice returns the set as a slice.

func (Set[T]) Union

func (s Set[T]) Union(other Set[T]) Set[T]

Union returns the union of two sets.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL