rx

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2024 License: MIT, MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Infinite int64 = -1

Infinite represents an infinite wait time

Functions

func Assert

func Assert[T any](ctx context.Context, iterable Iterable[T], asserters ...Asserter[T])

func Just

func Just[T any](values ...T) func(opts ...Option[T]) Observable[T]

Just creates an Observable with the provided items.

func JustError

func JustError[T any](err error) func(opts ...Option[T]) Single[T]

Just creates an Observable with the provided items.

func JustSingle

func JustSingle[T any](value T, opts ...Option[T]) func(opts ...Option[T]) Single[T]

JustSingle is like JustItem in that it is defined for a single item iterable but behaves like Just in that it returns a func. This is probably not required, just defined for experimental purposes for now.

func MaxInitLimitInt

func MaxInitLimitInt() int

func MinInitLimitInt

func MinInitLimitInt() int

func MustBool

func MustBool[T any](item Item[T]) bool

MustBool is a helper function that assists the client in interpreting a boolean item received via a channel. If the item it not a boolean, a panic is raised, otherwise the returned value is the underlying boolean value.

func MustOpaque

func MustOpaque[T, O any](item Item[T]) O

MustOpaque is a helper function that assists the client in interpreting an opaque item received via a channel. If the item it not of the custom type O, a panic is raised, otherwise the returned value is the underlying custom value.

func MustTV

func MustTV[T any](item Item[T]) int

TryTV is a helper function that assists the client in interpreting an integer based tick value item received via a channel. If the item it not a tick value, a panic is raised, otherwise the returned value is the underlying integer value.

func MustWCh

func MustWCh[T any](item Item[T]) chan<- Item[T]

MustWCh is a helper function that assists the client in interpreting a write chan item received via a channel. If the item it not a write chan, a panic is raised, otherwise the returned value is the underlying write chan value.

func NativeItemLimitComparator

func NativeItemLimitComparator[T constraints.Ordered](a, b Item[T]) int

func SendItems

func SendItems[T any](ctx context.Context,
	ch chan<- Item[T], strategy enums.CloseChannelStrategy, items ...any,
)

SendItems is a utility function that sends a list of items and indicates a strategy on whether to close the channel once the function completes.

func TryBool

func TryBool[T any](item Item[T]) (bool, error)

TryBool is a helper function that assists the client in interpreting a boolean item received via a channel. If the item it not a boolean, an error is returned, otherwise the returned value is the underlying boolean value.

func TryOpaque

func TryOpaque[T, O any](item Item[T]) (O, error)

TryOpaque is a helper function that assists the client in interpreting an opaque item received via a channel. If the item it not of the custom type O, an error is returned, otherwise the returned value is the underlying custom value.

func TryTV

func TryTV[T any](item Item[T]) (int, error)

TryTV is a helper function that assists the client in interpreting an integer based tick value item received via a channel. If the item it not a tick value, an error is returned, otherwise the returned value is the underlying integer value.

func TryWCh

func TryWCh[T any](item Item[T]) (chan<- Item[T], error)

TryWCh is a helper function that assists the client in interpreting a write chan item received via a channel. If the item it not a write chan, an error is returned, otherwise the returned value is the underlying write chan value.

Types

type AssertFunc

type AssertFunc[T any] func(actual AssertResources[T])

func (AssertFunc[T]) Check

func (f AssertFunc[T]) Check(actual AssertResources[T])

type AssertPredicate

type AssertPredicate[T any] func(actual AssertResources[T]) error

AssertPredicate is a custom predicate based on the items.

type AssertResources

type AssertResources[T any] interface {
	Values() []T
	Numbers() []int
	Errors() []error
	Booleans() []bool
	Ticks() []NumVal
	TickValues() []NumVal
	Opaques() []any
}

type Asserter

type Asserter[T any] interface {
	Check(actual AssertResources[T])
}

Asserter

type BadRangeIteratorError

type BadRangeIteratorError struct {
	// contains filtered or unexported fields
}
var RangeMissingWhilstError BadRangeIteratorError

func (BadRangeIteratorError) Error

func (e BadRangeIteratorError) Error() string

type Calculator

type Calculator[T any] interface {
	Add(T, T) T
	Div(T, T) T
	Inc(T) T
	IsZero(T) bool
	Zero() T
}

Calculator defines numeric operations for T

func Calc

func Calc[T Numeric]() Calculator[T]

func ItemCalc

func ItemCalc[T Numeric]() Calculator[T]

type Comparator

type Comparator[T any] func(Item[T], Item[T]) int

Comparator defines a func that returns an int: - 0 if two elements are equals - A negative value if the first argument is less than the second - A positive value if the first argument is greater than the second

type CompletedFunc

type CompletedFunc func()

CompletedFunc handles the end of a stream.

type ContainItems

type ContainItems[T any] struct {
	Expected []T
}

ContainItems

func (ContainItems[T]) Check

func (a ContainItems[T]) Check(actual AssertResources[T])

HasItems checks if an observable contains expected items

type CustomPredicate

type CustomPredicate[T any] struct {
	Expected AssertPredicate[T]
}

CustomPredicate

func (CustomPredicate[T]) Check

func (a CustomPredicate[T]) Check(actual AssertResources[T])

Check CustomPredicateAssert checks a custom predicate.

type Disposable

type Disposable context.CancelFunc

Disposable is a function to be called in order to dispose a subscription.

type Disposed

type Disposed <-chan struct{}

Disposed is a notification channel indicating when an Observable is closed.

type DistributionFunc

type DistributionFunc[T any] func(Item[T]) int

DistributionFunc used by GroupBy

type Duration

type Duration interface {
	// contains filtered or unexported methods
}

Duration represents a duration

func WithDuration

func WithDuration(d time.Duration) Duration

WithDuration is a duration option

type DynamicDistributionFunc

type DynamicDistributionFunc[T any] func(Item[T]) string

DistributionFunc used by GroupByDynamic

type Envelope added in v0.5.2

type Envelope[T any, O Numeric] struct {
	T *T
	P O
}

Envelope wraps a struct type T so that it can be defined with pointer receivers. It also means that the client does not need to manually defined methods for constraint ProxyField as they are implemented by the Envelope.

func Seal added in v0.5.2

func Seal[T any, O Numeric](t *T) *Envelope[T, O]

Seal wraps a struct T inside an Envelope

func (Envelope[T, O]) Field added in v0.5.2

func (e Envelope[T, O]) Field() O

Field nominates which member of T of type O is the proxy field required to satisfy constraint ProxyField.

func (Envelope[T, O]) Inc added in v0.5.2

func (e Envelope[T, O]) Inc(index *Envelope[T, O], by Envelope[T, O]) *Envelope[T, O]

Inc increments the P member of the Envelope required to satisfy constraint ProxyField.

func (Envelope[T, O]) Index added in v0.5.2

func (e Envelope[T, O]) Index(i int) *Envelope[T, O]

Index creates a new Envelope from the numeric value of i, required to satisfy constraint ProxyField.

type ErrFunc

type ErrFunc func(error)

ErrFunc handles an error in a stream.

type ErrorFunc

type ErrorFunc[T any] func(error) T

ErrorFunc defines a function that computes a value from an error.

type ErrorToObservable

type ErrorToObservable[T any] func(error) Observable[T]

ErrorToObservable defines a function that transforms an observable from an error.

type Func

type Func[T any] func(context.Context, T) (T, error)

Func defines a function that computes a value from an input value.

type Func2

type Func2[T any] func(context.Context, Item[T], Item[T]) (T, error)

Func2 defines a function that computes a value from two input values.

type FuncN

type FuncN[T any] func(...T) T

FuncIntM defines a function that's specialised for Map To solve the problem of being able to map values across different types, the FuncIntM type will be modified to take an extra type parameter 'O' which represents the 'Other' type, ie we map from a value of type 'T' to a value of type 'O' (Func[T, O any]). With this in place, we should be able to define a pipeline that starts off with values of type T, and end up with values of type O via a Map operator. We'll have to make sure that any intermediate channels can be appropriately defined. If we want to map between values within the same type, we need to use a separate definition, and in fact the original definition Func, should suffice.

The problem with Map is that it introduces a new Type. Perhaps we need a separate observer interface that can bridge from T to O. We can't introduce the type O, because that would have a horrendous cascading impact on every type, when most operations would not need this type O. With generics, Map is a very awkward operator that needs special attention. In the short term, what we can say is that the base functionality only allows mapping to different values within the same type. FuncN defines a function that computes a value from N input values.

type GroupedObservable

type GroupedObservable[T any] struct {
	Observable[T]
	// Key is the distribution key
	Key string
}

GroupedObservable is the observable type emitted by the GroupByDynamic operator.

type HasAnError

type HasAnError[T any] struct {
	Expected error
}

HasAnError

func (HasAnError[T]) Check

func (a HasAnError[T]) Check(actual AssertResources[T])

Check HasAnError ensures that the observable has produced a specific error.

type HasError

type HasError[T any] struct {
	Expected []error
}

HasError

func (HasError[T]) Check

func (a HasError[T]) Check(actual AssertResources[T])

type HasFalse

type HasFalse[T any] struct {
}

HasFalse

func (HasFalse[T]) Check

func (a HasFalse[T]) Check(actual AssertResources[T])

Check HasFalse checks boolean values contains at least 1 true false

type HasItem

type HasItem[T any] struct {
	Expected T
}

HasItem

func (HasItem[T]) Check

func (a HasItem[T]) Check(actual AssertResources[T])

HasItem checks if a single or optional single has a specific item.

type HasItems

type HasItems[T any] struct {
	Expected []T
}

HasItems

func (HasItems[T]) Check

func (a HasItems[T]) Check(actual AssertResources[T])

HasItems checks if an observable has an exact set of items.

type HasItemsNoOrder

type HasItemsNoOrder[T any] struct {
	Expected []T
}

HasItemsNoOrder

func (HasItemsNoOrder[T]) Check

func (a HasItemsNoOrder[T]) Check(actual AssertResources[T])

Check ensures that an observable produces the corresponding items regardless of the order.

type HasNoError

type HasNoError[T any] struct {
}

HasNoError

func (HasNoError[T]) Check

func (a HasNoError[T]) Check(actual AssertResources[T])

Check HasNoError ensures that the observable has not produced an error.

type HasNumber

type HasNumber[T any] struct {
	Expected int
}

HasNumber

func (HasNumber[T]) Check

func (a HasNumber[T]) Check(actual AssertResources[T])

HasNumber checks if a single or optional single has a specific numeric item.

type HasNumbers

type HasNumbers[T any] struct {
	Expected []T
}

HasNumbers

func (HasNumbers[T]) Check

func (a HasNumbers[T]) Check(actual AssertResources[T])

HasNumbers checks if an observable has an exact set of numeric items.

type HasNumbersNoOrder

type HasNumbersNoOrder[T any] struct {
	Expected []T
}

HasNumbersNoOrder

func (HasNumbersNoOrder[T]) Check

func (a HasNumbersNoOrder[T]) Check(actual AssertResources[T])

Check ensures that an observable produces the corresponding numbers regardless of the order.

type HasTickCount

type HasTickCount[T any] struct {
	Expected int
}

HasTickCount

func (HasTickCount[T]) Check

func (a HasTickCount[T]) Check(actual AssertResources[T])

HasTickValues checks if an observable has a expect count of tick value items.

type HasTickValue

type HasTickValue[T any] struct {
	Expected int
}

HasTickValue

func (HasTickValue[T]) Check

func (a HasTickValue[T]) Check(actual AssertResources[T])

HasTickValue checks if a single or optional single has a specific numeric item.

type HasTickValueCount

type HasTickValueCount[T any] struct {
	Expected int
}

HasTickValueCount

func (HasTickValueCount[T]) Check

func (a HasTickValueCount[T]) Check(actual AssertResources[T])

HasTickValues checks if an observable has a expect count of tick value items.

type HasTicks

type HasTicks[T any] struct {
	Expected []T
}

HasTicks

func (HasTicks[T]) Check

func (a HasTicks[T]) Check(actual AssertResources[T])

HasTickValues checks if an observable has an exact set of numeric items.

type HasTrue

type HasTrue[T any] struct {
}

HasTrue

func (HasTrue[T]) Check

func (a HasTrue[T]) Check(actual AssertResources[T])

Check HasTrue checks boolean values contains at least 1 true value

type IllegalInputError

type IllegalInputError struct {
	// contains filtered or unexported fields
}

IllegalInputError is triggered when the observable receives an illegal input.

func (IllegalInputError) Error

func (e IllegalInputError) Error() string

type IndexOutOfBoundError

type IndexOutOfBoundError struct {
	// contains filtered or unexported fields
}

IndexOutOfBoundError is triggered when the observable cannot access to the specified index.

func (IndexOutOfBoundError) Error

func (e IndexOutOfBoundError) Error() string

type InitLimit

type InitLimit[T any] func() Item[T]

InitLimit defines a function to be used with Min and Max operators that defines a limit initialiser, that is to say, for Max we need to initialise the internal maximum reference point to be minimum value for type T and the reverse for the Min operator.

type IsEmpty

type IsEmpty[T any] struct {
}

IsEmpty

func (IsEmpty[T]) Check

func (a IsEmpty[T]) Check(actual AssertResources[T])

type IsFalse

type IsFalse[T any] struct {
}

IsFalse

func (IsFalse[T]) Check

func (a IsFalse[T]) Check(actual AssertResources[T])

Check IsFalse checks boolean value is false

type IsNotEmpty

type IsNotEmpty[T any] struct {
}

IsNotEmpty

func (IsNotEmpty[T]) Check

func (a IsNotEmpty[T]) Check(actual AssertResources[T])

type IsTrue

type IsTrue[T any] struct {
}

IsTrue

func (IsTrue[T]) Check

func (a IsTrue[T]) Check(actual AssertResources[T])

Check IsTrue checks boolean value is true

type IsZero

type IsZero[T any] func(T) bool

IsZero determines whether the value T is zero

type Item

type Item[T any] struct {
	V T
	E error
	// contains filtered or unexported fields
}

Item is a wrapper having either a value, error or an opaque value

func Bool

func Bool[T any](b bool) Item[T]

Bool creates a type safe boolean instance

func Error

func Error[T any](err error) Item[T]

Error creates an item from an error.

func False

func False[T any]() Item[T]

False creates a type safe boolean instance set to false

func MaxItemInitLimitInt

func MaxItemInitLimitInt() Item[int]

func MinItemInitLimitInt

func MinItemInitLimitInt() Item[int]

func Num

func Num[T any](n NumVal) Item[T]

Num creates a type safe tick value instance

func Of

func Of[T any](v T) Item[T]

Of creates an item from a value.

func Opaque

func Opaque[T any](o any) Item[T]

Opaque creates an item from any type of value

func TV

func TV[T any](tv int) Item[T]

TV creates a type safe tick value instance

func Tick

func Tick[T any]() Item[T]

TV creates a type safe tick instance (no value)

func True

func True[T any]() Item[T]

True creates a type safe boolean instance set to true

func WCh

func WCh[T any](ch any) Item[T]

WCh creates an item from a channel

func Zero

func Zero[T any]() Item[T]

Zero creates a zero value item.

func (Item[T]) Bool

func (it Item[T]) Bool() bool

Returns the tick value of item

func (Item[T]) Ch

func (it Item[T]) Ch() chan<- Item[T]

Returns the channel value of item

func (Item[T]) Desc

func (it Item[T]) Desc() string

func (Item[T]) Disc

func (it Item[T]) Disc() enums.ItemDiscriminator

Disc returns the discriminator of the item

func (Item[T]) IsBoolean

func (it Item[T]) IsBoolean() bool

IsBoolean checks if an item is a boolean instance.

func (Item[T]) IsCh

func (it Item[T]) IsCh() bool

IsCh checks if an item is an error.

func (Item[T]) IsError

func (it Item[T]) IsError() bool

IsError checks if an item is an error.

func (Item[T]) IsNumber

func (it Item[T]) IsNumber() bool

IsNumber checks if an item is a numeric instance.

func (Item[T]) IsOpaque

func (it Item[T]) IsOpaque() bool

IsOpaque checks if an item is an opaque value.

func (Item[T]) IsTick

func (it Item[T]) IsTick() bool

IsTick checks if an item is a tick instance.

func (Item[T]) IsTickValue

func (it Item[T]) IsTickValue() bool

IsTickValue checks if an item is a tick value instance.

func (Item[T]) IsValue

func (it Item[T]) IsValue() bool

IsValue checks if an item is a value.

func (Item[T]) Num

func (it Item[T]) Num() NumVal

Returns the tick value of item

func (Item[T]) Opaque

func (it Item[T]) Opaque() any

Opaque returns the opaque value of item without typecast

func (Item[T]) SendBlocking

func (it Item[T]) SendBlocking(ch chan<- Item[T])

SendBlocking sends an item and blocks until it is sent.

func (Item[T]) SendContext

func (it Item[T]) SendContext(ctx context.Context, ch chan<- Item[T]) bool

SendContext sends an item and blocks until it is sent or a context canceled. It returns a boolean to indicate whether the item was sent.

func (Item[T]) SendNonBlocking

func (it Item[T]) SendNonBlocking(ch chan<- Item[T]) bool

SendNonBlocking sends an item without blocking. It returns a boolean to indicate whether the item was sent.

func (Item[T]) TV

func (it Item[T]) TV() int

Returns the tick value of item

type ItemToObservable

type ItemToObservable[T any] func(Item[T]) Observable[T]

ItemToObservable defines a function that computes an observable from an item.

type Iterable

type Iterable[T any] interface {
	Observe(opts ...Option[T]) <-chan Item[T]
}

Iterable is the basic type that can be observed.

type Marshaller

type Marshaller[T any] func(T) ([]byte, error)

Marshaller defines a marshaller type (ItemValue[T] to []byte).

type MissingCalcError

type MissingCalcError struct {
	// contains filtered or unexported fields
}

MissingCalcError is triggered when if the client forgets to provide a calculator option

func (MissingCalcError) Error

func (e MissingCalcError) Error() string

type NextFunc

type NextFunc[T any] func(Item[T])

NextFunc handles a next item in a stream.

type NumVal

type NumVal = int

NumVal is an integer value used by Item.N and Range

func MustNum

func MustNum[T any](item Item[T]) NumVal

MustNum is a helper function that assists the client in interpreting an integer item received via a channel. If the item it not an integer, a panic is raised, otherwise the returned value is the underlying integer value.

func TryNum

func TryNum[T any](item Item[T]) (NumVal, error)

TryNum is a helper function that assists the client in interpreting an integer item received via a channel. If the item it not an integer, an error is returned, otherwise the returned value is the underlying integer value.

type Numeric

Numeric defines a constraint that targets scalar types for whom numeric operators are natively defined.

type NumericCalc

type NumericCalc[T Numeric] struct {
	// contains filtered or unexported fields
}

NumericCalc is a predefine calculator for any numeric type

func (*NumericCalc[T]) Add

func (c *NumericCalc[T]) Add(a, b T) T

func (*NumericCalc[T]) Div

func (c *NumericCalc[T]) Div(a, b T) T

func (*NumericCalc[T]) Inc

func (c *NumericCalc[T]) Inc(v T) T

func (*NumericCalc[T]) IsZero

func (c *NumericCalc[T]) IsZero(v T) bool

func (*NumericCalc[T]) Zero

func (c *NumericCalc[T]) Zero() T

type NumericRangeIterator

type NumericRangeIterator[T Numeric] struct {
	StartAt T
	By      T
	Whilst  WhilstFunc[T]
	// contains filtered or unexported fields
}

func (*NumericRangeIterator[T]) Increment

func (i *NumericRangeIterator[T]) Increment(index *T) T

Increment increments the index value

func (*NumericRangeIterator[T]) Init

func (i *NumericRangeIterator[T]) Init() error

func (*NumericRangeIterator[T]) Start

func (i *NumericRangeIterator[T]) Start() (*T, error)

Start should return the initial index value. If the By value has not been set, it will default to 1.

func (*NumericRangeIterator[T]) Step

func (i *NumericRangeIterator[T]) Step() T

func (*NumericRangeIterator[T]) While

func (i *NumericRangeIterator[T]) While(current T) bool

While defines a condition that must be true for the loop to continue iterating.

type Observable

type Observable[T any] interface {
	Iterable[T]
	All(predicate Predicate[T], opts ...Option[T]) Single[T]
	Average(opts ...Option[T]) Single[T]
	BackOffRetry(backOffCfg backoff.BackOff, opts ...Option[T]) Observable[T]
	Connect(ctx context.Context) (context.Context, Disposable)
	Contains(equal Predicate[T], opts ...Option[T]) Single[T]
	Count(opts ...Option[T]) Single[T]
	DefaultIfEmpty(defaultValue T, opts ...Option[T]) Observable[T]
	Distinct(apply Func[T], opts ...Option[T]) Observable[T]
	DistinctUntilChanged(apply Func[T], comparator Comparator[T], opts ...Option[T]) Observable[T]
	DoOnCompleted(completedFunc CompletedFunc, opts ...Option[T]) Disposed
	DoOnError(errFunc ErrFunc, opts ...Option[T]) Disposed
	DoOnNext(nextFunc NextFunc[T], opts ...Option[T]) Disposed
	ElementAt(index uint, opts ...Option[T]) Single[T]
	Error(opts ...Option[T]) error
	Errors(opts ...Option[T]) []error
	Filter(apply Predicate[T], opts ...Option[T]) Observable[T]
	Find(find Predicate[T], opts ...Option[T]) OptionalSingle[T]
	First(opts ...Option[T]) OptionalSingle[T]
	FirstOrDefault(defaultValue T, opts ...Option[T]) Single[T]
	FlatMap(apply ItemToObservable[T], opts ...Option[T]) Observable[T]
	ForEach(nextFunc NextFunc[T], errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option[T]) Disposed
	GroupBy(length int, distribution DistributionFunc[T], opts ...Option[T]) Observable[T]
	GroupByDynamic(distribution DynamicDistributionFunc[T], opts ...Option[T]) Observable[T]
	IgnoreElements(opts ...Option[T]) Observable[T]
	Last(opts ...Option[T]) OptionalSingle[T]
	LastOrDefault(defaultValue T, opts ...Option[T]) Single[T]
	Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
	Map(apply Func[T], opts ...Option[T]) Observable[T]
	Min(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
	OnErrorResumeNext(resumeSequence ErrorToObservable[T], opts ...Option[T]) Observable[T]
	OnErrorReturn(resumeFunc ErrorFunc[T], opts ...Option[T]) Observable[T]
	OnErrorReturnItem(resume T, opts ...Option[T]) Observable[T]
	Reduce(apply Func2[T], opts ...Option[T]) OptionalSingle[T]
	Repeat(count int64, frequency Duration, opts ...Option[T]) Observable[T]
	Retry(count int, shouldRetry ShouldRetryFunc, opts ...Option[T]) Observable[T]
	Run(opts ...Option[T]) Disposed
	Sample(iterable Iterable[T], opts ...Option[T]) Observable[T]
	Scan(apply Func2[T], opts ...Option[T]) Observable[T]
	Send(output chan<- Item[T], opts ...Option[T])
	SequenceEqual(iterable Iterable[T], comparator Comparator[T], opts ...Option[T]) Single[T]
	Serialize(from int, identifier func(any) int, opts ...Option[T]) Observable[T]
	Skip(nth uint, opts ...Option[T]) Observable[T]
	SkipLast(nth uint, opts ...Option[T]) Observable[T]
	SkipWhile(apply Predicate[T], opts ...Option[T]) Observable[T]
	StartWith(iterable Iterable[T], opts ...Option[T]) Observable[T]
	Sum(opts ...Option[T]) OptionalSingle[T]
	Take(nth uint, opts ...Option[T]) Observable[T]
	TakeLast(nth uint, opts ...Option[T]) Observable[T]
	TakeUntil(apply Predicate[T], opts ...Option[T]) Observable[T]
	TakeWhile(apply Predicate[T], opts ...Option[T]) Observable[T]
	TimeInterval(opts ...Option[T]) Observable[T]
	Timestamp(opts ...Option[T]) Observable[T]
	ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error)
	WindowWithCount(count int, opts ...Option[T]) Observable[T]
	WindowWithTime(timespan Duration, opts ...Option[T]) Observable[T]
	WindowWithTimeOrCount(timespan Duration, count int, opts ...Option[T]) Observable[T]
	ZipFromIterable(iterable Iterable[T], zipper Func2[T], opts ...Option[T]) Observable[T]
}

func Amb

func Amb[T any](observables []Observable[T], opts ...Option[T]) Observable[T]

Amb takes several Observables, emit all of the items from only the first of these Observables to emit an item or notification. (What the hell is an Amb, WTF)

func CombineLatest

func CombineLatest[T any](f FuncN[T], observables []Observable[T],
	opts ...Option[T],
) Observable[T]

CombineLatest combines the latest item emitted by each Observable via a specified function and emit items based on the results of this function. Requires a calculator, so specify this with the WithCalc option.

func Concat

func Concat[T any](observables []Observable[T], opts ...Option[T]) Observable[T]

Concat emits the emissions from two or more Observables without interleaving them.

func Create

func Create[T any](f []Producer[T], opts ...Option[T]) Observable[T]

Create creates an Observable from scratch by calling observer methods programmatically.

func Defer

func Defer[T any](f []Producer[T], opts ...Option[T]) Observable[T]

Defer does not create the Observable until the observer subscribes, and creates a fresh Observable for each observer. This creates a cold observable.

func Empty

func Empty[T any]() Observable[T]

Empty creates an Observable with no item and terminate immediately.

func FromChannel

func FromChannel[T any](next <-chan Item[T], opts ...Option[T]) Observable[T]

FromChannel creates a cold observable from a channel.

func FromEventSource

func FromEventSource[T any](next <-chan Item[T], opts ...Option[T]) Observable[T]

FromEventSource creates a hot observable from a channel.

func Interval

func Interval[T any](interval Duration, opts ...Option[T]) Observable[T]

Interval creates an Observable emitting incremental integers infinitely between each given time interval.

func Merge

func Merge[T any](observables []Observable[T], opts ...Option[T]) Observable[T]

Merge combines multiple Observables into one by merging their emissions

func Never

func Never[T any]() Observable[T]

Never creates an Observable that emits no items and does not terminate.

func Range

func Range[T Numeric](iterator RangeIterator[T], opts ...Option[T]) Observable[T]

Range creates an Observable that emits count sequential integers beginning at start.

func RangePF added in v0.5.2

func RangePF[T ProxyField[T, O], O Numeric](iterator RangeIteratorPF[T, O],
	opts ...Option[T],
) Observable[T]

RangePF creates an Observable that emits count sequential integers beginning at start, for non numeric types, which do contain a nominated proxy Numeric member

func Start

func Start[T any](fs []Supplier[T], opts ...Option[T]) Observable[T]

Start creates an Observable from one or more directive-like Supplier and emits the result of each operation asynchronously on a new Observable.

func Thrown

func Thrown[T any](err error) Observable[T]

Thrown creates an Observable that emits no items and terminates with an error.

func Timer

func Timer[T any](d Duration, opts ...Option[T]) Observable[T]

Timer returns an Observable that completes after a specified delay.

type ObservableImpl

type ObservableImpl[T any] struct {
	// contains filtered or unexported fields
}

ObservableImpl implements Observable.

func (*ObservableImpl[T]) All

func (o *ObservableImpl[T]) All(predicate Predicate[T], opts ...Option[T]) Single[T]

All determines whether all items emitted by an Observable meet some criteria.

func (*ObservableImpl[T]) Average

func (o *ObservableImpl[T]) Average(opts ...Option[T],
) Single[T]

Average calculates the average of numbers emitted by an Observable and emits the result. Requires a calculator, so specify this with the WithCalc option.

func (*ObservableImpl[T]) BackOffRetry

func (o *ObservableImpl[T]) BackOffRetry(backOffCfg backoff.BackOff,
	opts ...Option[T],
) Observable[T]

BackOffRetry implements a backoff retry if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error. Cannot be run in parallel.

func (*ObservableImpl[T]) Connect

func (o *ObservableImpl[T]) Connect(ctx context.Context) (context.Context, Disposable)

Connect instructs a connectable Observable to begin emitting items to its subscribers.

func (*ObservableImpl[T]) Contains

func (o *ObservableImpl[T]) Contains(equal Predicate[T], opts ...Option[T]) Single[T]

func (*ObservableImpl[T]) Count

func (o *ObservableImpl[T]) Count(opts ...Option[T]) Single[T]

Count counts the number of items emitted by the source Observable and emit only this value.

func (*ObservableImpl[T]) DefaultIfEmpty

func (o *ObservableImpl[T]) DefaultIfEmpty(defaultValue T, opts ...Option[T]) Observable[T]

DefaultIfEmpty returns an Observable that emits the items emitted by the source Observable or a specified default item if the source Observable is empty.

func (*ObservableImpl[T]) Distinct

func (o *ObservableImpl[T]) Distinct(apply Func[T], opts ...Option[T]) Observable[T]

Distinct suppresses duplicate items in the original Observable and returns a new Observable.

func (*ObservableImpl[T]) DistinctUntilChanged

func (o *ObservableImpl[T]) DistinctUntilChanged(apply Func[T], comparator Comparator[T],
	opts ...Option[T],
) Observable[T]

DistinctUntilChanged suppresses consecutive duplicate items in the original Observable. Cannot be run in parallel.

func (*ObservableImpl[T]) DoOnCompleted

func (o *ObservableImpl[T]) DoOnCompleted(completedFunc CompletedFunc,
	opts ...Option[T],
) Disposed

DoOnCompleted registers a callback action that will be called once the Observable terminates.

func (*ObservableImpl[T]) DoOnError

func (o *ObservableImpl[T]) DoOnError(errFunc ErrFunc, opts ...Option[T]) Disposed

DoOnError registers a callback action that will be called if the Observable terminates abnormally.

func (*ObservableImpl[T]) DoOnNext

func (o *ObservableImpl[T]) DoOnNext(nextFunc NextFunc[T], opts ...Option[T]) Disposed

DoOnNext registers a callback action that will be called on each item emitted by the Observable.

func (*ObservableImpl[T]) ElementAt

func (o *ObservableImpl[T]) ElementAt(index uint, opts ...Option[T]) Single[T]

ElementAt emits only item n emitted by an Observable. Cannot be run in parallel.

func (*ObservableImpl[T]) Error

func (o *ObservableImpl[T]) Error(opts ...Option[T]) error

Error returns the eventual Observable error. This method is blocking.

func (*ObservableImpl[T]) Errors

func (o *ObservableImpl[T]) Errors(opts ...Option[T]) []error

Errors returns an eventual list of Observable errors. This method is blocking

func (*ObservableImpl[T]) Filter

func (o *ObservableImpl[T]) Filter(apply Predicate[T], opts ...Option[T]) Observable[T]

Filter emits only those items from an Observable that pass a predicate test.

func (*ObservableImpl[T]) Find

func (o *ObservableImpl[T]) Find(find Predicate[T], opts ...Option[T]) OptionalSingle[T]

Find emits the first item passing a predicate then complete.

func (*ObservableImpl[T]) First

func (o *ObservableImpl[T]) First(opts ...Option[T]) OptionalSingle[T]

First returns new Observable which emit only first item. Cannot be run in parallel.

func (*ObservableImpl[T]) FirstOrDefault

func (o *ObservableImpl[T]) FirstOrDefault(defaultValue T, opts ...Option[T]) Single[T]

FirstOrDefault returns new Observable which emit only first item. If the observable fails to emit any items, it emits a default value. Cannot be run in parallel.

func (*ObservableImpl[T]) FlatMap

func (o *ObservableImpl[T]) FlatMap(apply ItemToObservable[T],
	opts ...Option[T],
) Observable[T]

FlatMap transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable.

func (*ObservableImpl[T]) ForEach

func (o *ObservableImpl[T]) ForEach(nextFunc NextFunc[T],
	errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option[T],
) Disposed

ForEach subscribes to the Observable and receives notifications for each element.

func (*ObservableImpl[T]) GroupBy

func (o *ObservableImpl[T]) GroupBy(length int,
	distribution DistributionFunc[T], opts ...Option[T],
) Observable[T]

GroupBy divides an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key.

func (*ObservableImpl[T]) GroupByDynamic

func (o *ObservableImpl[T]) GroupByDynamic(distribution DynamicDistributionFunc[T],
	opts ...Option[T],
) Observable[T]

GroupByDynamic divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.

func (*ObservableImpl[T]) IgnoreElements

func (o *ObservableImpl[T]) IgnoreElements(opts ...Option[T]) Observable[T]

IgnoreElements ignores all items emitted by the source ObservableSource except for the errors. Cannot be run in parallel.

func (*ObservableImpl[T]) Last

func (o *ObservableImpl[T]) Last(opts ...Option[T]) OptionalSingle[T]

Last returns a new Observable which emit only last item. Cannot be run in parallel.

func (*ObservableImpl[T]) LastOrDefault

func (o *ObservableImpl[T]) LastOrDefault(defaultValue T, opts ...Option[T]) Single[T]

func (*ObservableImpl[T]) Map

func (o *ObservableImpl[T]) Map(apply Func[T], opts ...Option[T]) Observable[T]

Map transforms the items emitted by an Observable by applying a function to each item.

func (*ObservableImpl[T]) Max

func (o *ObservableImpl[T]) Max(comparator Comparator[T], initLimit InitLimit[T],
	opts ...Option[T],
) OptionalSingle[T]

Max determines and emits the maximum-valued item emitted by an Observable according to a comparator.

func (*ObservableImpl[T]) Min

func (o *ObservableImpl[T]) Min(comparator Comparator[T], initLimit InitLimit[T],
	opts ...Option[T],
) OptionalSingle[T]

Min determines and emits the minimum-valued item emitted by an Observable according to a comparator.

func (*ObservableImpl[T]) Observe

func (o *ObservableImpl[T]) Observe(opts ...Option[T]) <-chan Item[T]

func (*ObservableImpl[T]) OnErrorResumeNext

func (o *ObservableImpl[T]) OnErrorResumeNext(resumeSequence ErrorToObservable[T],
	opts ...Option[T],
) Observable[T]

OnErrorResumeNext instructs an Observable to pass control to another Observable rather than invoking onError if it encounters an error.

func (*ObservableImpl[T]) OnErrorReturn

func (o *ObservableImpl[T]) OnErrorReturn(resumeFunc ErrorFunc[T], opts ...Option[T]) Observable[T]

func (*ObservableImpl[T]) OnErrorReturnItem

func (o *ObservableImpl[T]) OnErrorReturnItem(resume T, opts ...Option[T]) Observable[T]

func (*ObservableImpl[T]) Reduce

func (o *ObservableImpl[T]) Reduce(apply Func2[T], opts ...Option[T]) OptionalSingle[T]

Reduce applies a function to each item emitted by an Observable, sequentially, and emit the final value.

func (*ObservableImpl[T]) Repeat

func (o *ObservableImpl[T]) Repeat(count int64, frequency Duration, opts ...Option[T]) Observable[T]

Repeat returns an Observable that repeats the sequence of items emitted by the source Observable at most count times, at a particular frequency. Cannot run in parallel.

func (*ObservableImpl[T]) Retry

func (o *ObservableImpl[T]) Retry(count int, shouldRetry ShouldRetryFunc,
	opts ...Option[T],
) Observable[T]

Retry retries if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error. Cannot be run in parallel.

func (*ObservableImpl[T]) Run

func (o *ObservableImpl[T]) Run(opts ...Option[T]) Disposed

Run creates an Observer without consuming the emitted items.

func (*ObservableImpl[T]) Sample

func (o *ObservableImpl[T]) Sample(iterable Iterable[T], opts ...Option[T]) Observable[T]

Sample returns an Observable that emits the most recent items emitted by the source Iterable whenever the input Iterable emits an item.

func (*ObservableImpl[T]) Scan

func (o *ObservableImpl[T]) Scan(apply Func2[T], opts ...Option[T]) Observable[T]

Scan apply a Func2 to each item emitted by an Observable, sequentially, and emit each successive value. Cannot be run in parallel.

func (*ObservableImpl[T]) Send

func (o *ObservableImpl[T]) Send(output chan<- Item[T], opts ...Option[T])

Send sends the items to a given channel.

func (*ObservableImpl[T]) SequenceEqual

func (o *ObservableImpl[T]) SequenceEqual(iterable Iterable[T],
	comparator Comparator[T],
	opts ...Option[T],
) Single[T]

SequenceEqual emits true if an Observable and the input Observable emit the same items, in the same order, with the same termination state. Otherwise, it emits false.

func (*ObservableImpl[T]) Serialize

func (o *ObservableImpl[T]) Serialize(from int, identifier func(any) int,
	opts ...Option[T],
) Observable[T]

Serialize forces an Observable to make serialized calls and to be well-behaved.

func (*ObservableImpl[T]) Skip

func (o *ObservableImpl[T]) Skip(nth uint, opts ...Option[T]) Observable[T]

Skip suppresses the first n items in the original Observable and returns a new Observable with the rest items. Cannot be run in parallel.

func (*ObservableImpl[T]) SkipLast

func (o *ObservableImpl[T]) SkipLast(nth uint, opts ...Option[T]) Observable[T]

SkipLast suppresses the last n items in the original Observable and returns a new Observable with the rest items. Cannot be run in parallel.

func (*ObservableImpl[T]) SkipWhile

func (o *ObservableImpl[T]) SkipWhile(apply Predicate[T], opts ...Option[T]) Observable[T]

SkipWhile discard items emitted by an Observable until a specified condition becomes false. Cannot be run in parallel.

func (*ObservableImpl[T]) StartWith

func (o *ObservableImpl[T]) StartWith(iterable Iterable[T], opts ...Option[T]) Observable[T]

StartWith emits a specified Iterable before beginning to emit the items from the source Observable.

func (*ObservableImpl[T]) Sum

func (o *ObservableImpl[T]) Sum(opts ...Option[T]) OptionalSingle[T]

Sum calculates the average emitted by an Observable and emits the result. Requires a calculator, so specify this with the WithCalc option.

func (*ObservableImpl[T]) Take

func (o *ObservableImpl[T]) Take(nth uint, opts ...Option[T]) Observable[T]

Take emits only the first n items emitted by an Observable. Cannot be run in parallel.

func (*ObservableImpl[T]) TakeLast

func (o *ObservableImpl[T]) TakeLast(nth uint, opts ...Option[T]) Observable[T]

TakeLast emits only the last n items emitted by an Observable. Cannot be run in parallel.

func (*ObservableImpl[T]) TakeUntil

func (o *ObservableImpl[T]) TakeUntil(apply Predicate[T],
	opts ...Option[T],
) Observable[T]

TakeUntil returns an Observable that emits items emitted by the source Observable, checks the specified predicate for each item, and then completes when the condition is satisfied. Cannot be run in parallel.

func (*ObservableImpl[T]) TakeWhile

func (o *ObservableImpl[T]) TakeWhile(apply Predicate[T],
	opts ...Option[T],
) Observable[T]

TakeWhile returns an Observable that emits items emitted by the source ObservableSource so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied. Cannot be run in parallel.

func (*ObservableImpl[T]) TimeInterval

func (o *ObservableImpl[T]) TimeInterval(opts ...Option[T]) Observable[T]

func (*ObservableImpl[T]) Timestamp

func (o *ObservableImpl[T]) Timestamp(opts ...Option[T]) Observable[T]

func (*ObservableImpl[T]) ToSlice

func (o *ObservableImpl[T]) ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error)

ToSlice collects all items from an Observable and emit them in a slice and an optional error. Cannot be run in parallel.

func (*ObservableImpl[T]) WindowWithCount

func (o *ObservableImpl[T]) WindowWithCount(count int, opts ...Option[T]) Observable[T]

WindowWithCount periodically subdivides items from an Observable into Observable windows of a given size and emit these windows rather than emitting the items one at a time.

func (*ObservableImpl[T]) WindowWithTime

func (o *ObservableImpl[T]) WindowWithTime(timespan Duration, opts ...Option[T]) Observable[T]

WindowWithTime periodically subdivides items from an Observable into Observables based on timed windows and emit them rather than emitting the items one at a time.

func (*ObservableImpl[T]) WindowWithTimeOrCount

func (o *ObservableImpl[T]) WindowWithTimeOrCount(timespan Duration,
	count int, opts ...Option[T],
) Observable[T]

WindowWithTimeOrCount periodically subdivides items from an Observable into Observables based on timed windows or a specific size and emit them rather than emitting the items one at a time.

func (*ObservableImpl[T]) ZipFromIterable

func (o *ObservableImpl[T]) ZipFromIterable(iterable Iterable[T], zipper Func2[T],
	opts ...Option[T],
) Observable[T]

ZipFromIterable merges the emissions of an Iterable via a specified function and emit single items for each combination based on the results of this function.

type Option

type Option[T any] interface {
	// contains filtered or unexported methods
}

func Serialize

func Serialize[T any](identifier func(T) int) Option[T]

Serialize forces an Observable to make serialized calls and to be well-behaved.

func WithBackPressureStrategy

func WithBackPressureStrategy[T any](strategy enums.BackPressureStrategy) Option[T]

WithBackPressureStrategy sets the back pressure strategy: drop or block.

func WithBufferedChannel

func WithBufferedChannel[T any](capacity int) Option[T]

WithBufferedChannel allows to configure the capacity of a buffered channel.

func WithCPUPool

func WithCPUPool[T any]() Option[T]

WithCPUPool allows to specify an execution pool based on the number of logical CPUs.

func WithCalc

func WithCalc[T any](calculator Calculator[T]) Option[T]

func WithContext

func WithContext[T any](ctx context.Context) Option[T]

WithContext allows to pass a context.

func WithErrorStrategy

func WithErrorStrategy[T any](strategy enums.OnErrorStrategy) Option[T]

WithErrorStrategy defines how an observable should deal with error. This strategy is propagated to the parent observable.

func WithObservationStrategy

func WithObservationStrategy[T any](strategy enums.ObservationStrategy) Option[T]

WithObservationStrategy uses the eager observation mode meaning consuming the items even without subscription.

func WithPool

func WithPool[T any](pool int) Option[T]

WithPool allows to specify an execution pool.

func WithPublishStrategy

func WithPublishStrategy[T any]() Option[T]

WithPublishStrategy converts an ordinary Observable into a connectable Observable.

type OptionalSingle

type OptionalSingle[T any] interface {
	Iterable[T]
	Get(opts ...Option[T]) (Item[T], error)
	Map(apply Func[T], opts ...Option[T]) OptionalSingle[T]
	Run(opts ...Option[T]) Disposed
}

OptionalSingle is an optional single.

type OptionalSingleImpl

type OptionalSingleImpl[T any] struct {
	// contains filtered or unexported fields
}

OptionalSingleImpl implements OptionalSingle.

func NewOptionalSingleImpl

func NewOptionalSingleImpl[T any](iterable Iterable[T]) OptionalSingleImpl[T]

NewOptionalSingleImpl create OptionalSingleImpl

func (*OptionalSingleImpl[T]) Get

func (o *OptionalSingleImpl[T]) Get(opts ...Option[T]) (Item[T], error)

Get returns the item or rxgo.OptionalEmpty. The error returned is if the context has been cancelled. This method is blocking.

func (*OptionalSingleImpl[T]) Map

func (o *OptionalSingleImpl[T]) Map(apply Func[T], opts ...Option[T]) OptionalSingle[T]

Map transforms the items emitted by an OptionalSingle by applying a function to each item.

func (*OptionalSingleImpl[T]) Observe

func (o *OptionalSingleImpl[T]) Observe(opts ...Option[T]) <-chan Item[T]

Observe observes an OptionalSingle by returning its channel.

func (*OptionalSingleImpl[T]) Run

func (o *OptionalSingleImpl[T]) Run(opts ...Option[T]) Disposed

Run creates an observer without consuming the emitted items.

type Predicate

type Predicate[T any] func(Item[T]) bool

Predicate defines a func that returns a bool from an input item.

type Producer

type Producer[T any] func(ctx context.Context, next chan<- Item[T])

Producer defines a producer implementation.

type ProxyField

type ProxyField[T any, O Numeric] interface {
	// Field defines the nominated proxy field
	Field() O
	// Inc is the incrementor function invoked by the iterator. Implementations
	// should add the value of by to index.
	Inc(index *T, by T) *T
	// Index should return an instance of T that represents the numeric value
	// of i. Typically, this requires returning an instance of T whose nominated
	// proxied field is set to i.
	Index(i int) *T
}

ProxyField assists the client when dealing with struct type of T. Used by RangeIteratorPF. Any struct type that is used as the type T must nominate a field (the proxy), that will be used as the iterator field. T represents the parent type and O represents the proxied field type.

type RangeIterator

type RangeIterator[T any] interface {
	// Init performs pre iteration check and returns an error on failure.
	Init() error
	// Start should return the initial index value
	Start() (*T, error)
	// Step is used by Increment and defines the size of increment for each iteration
	Step() T
	// Increment increments the index value
	Increment(index *T) T
	// While defines a condition that must be true for the loop to
	// continue iterating.
	While(current T) bool
}

RangeIterator allows the client defines how the Range operator emits derived items.

type RangeIteratorByProxy added in v0.5.2

type RangeIteratorByProxy[T ProxyField[T, O], O Numeric] struct {
	StartAt T
	By      T
	Whilst  WhilstFunc[T]
	// contains filtered or unexported fields
}

RangeIteratorByProxy iterator required for struct types of T, where the client has nominated a member of T to be the proxy field with which numeric operations are performed to generate indexes for iteration.

func (*RangeIteratorByProxy[T, O]) Increment added in v0.5.2

func (i *RangeIteratorByProxy[T, O]) Increment(index *T) *T

Increment increments index value

func (*RangeIteratorByProxy[T, O]) Init added in v0.5.2

func (i *RangeIteratorByProxy[T, O]) Init() error

Init is invoked prior to iteration and returns an error if not defined correctly.

func (*RangeIteratorByProxy[T, O]) Start added in v0.5.2

func (i *RangeIteratorByProxy[T, O]) Start() (*T, error)

Start should return the initial index value. If By has not been set, a panic occurs

func (*RangeIteratorByProxy[T, O]) Step added in v0.5.2

func (i *RangeIteratorByProxy[T, O]) Step() O

func (*RangeIteratorByProxy[T, O]) While added in v0.5.2

func (i *RangeIteratorByProxy[T, O]) While(current T) bool

While defines a condition that must be true for the loop to continue iterating.

type RangeIteratorPF

type RangeIteratorPF[T ProxyField[T, O], O Numeric] interface {
	// Init performs pre iteration check and returns an error on failure.
	Init() error
	// Start should return the initial index value
	Start() (*T, error)
	// Step is used by Increment and defines the size of increment for each iteration
	Step() O
	// Increment returns a pointer to a new instance of with incremented index value
	Increment(index *T) *T
	// While defines a condition that must be true for the loop to
	// continue iterating.
	While(current T) bool
}

type ShouldRetryFunc

type ShouldRetryFunc func(error) bool

ShouldRetryFunc as used by Retry operator

type Single

type Single[T any] interface {
	Iterable[T]
	Filter(apply Predicate[T], opts ...Option[T]) OptionalSingle[T]
	Get(opts ...Option[T]) (Item[T], error)
	Map(apply Func[T], opts ...Option[T]) Single[T]
	Run(opts ...Option[T]) Disposed
}

Single is a observable with a single element.

func JustItem

func JustItem[T any](value T, opts ...Option[T]) Single[T]

JustItem creates a single from one item.

type SingleImpl

type SingleImpl[T any] struct {
	// contains filtered or unexported fields
}

SingleImpl implements Single.

func (*SingleImpl[T]) Filter

func (s *SingleImpl[T]) Filter(apply Predicate[T], opts ...Option[T]) OptionalSingle[T]

Filter emits only those items from an Observable that pass a predicate test.

func (*SingleImpl[T]) Get

func (s *SingleImpl[T]) Get(opts ...Option[T]) (Item[T], error)

Get returns the item. The error returned is if the context has been cancelled. This method is blocking.

func (*SingleImpl[T]) Map

func (s *SingleImpl[T]) Map(apply Func[T], opts ...Option[T]) Single[T]

Map transforms the items emitted by a Single by applying a function to each item.

func (*SingleImpl[T]) Observe

func (s *SingleImpl[T]) Observe(opts ...Option[T]) <-chan Item[T]

Observe observes a Single by returning its channel.

func (*SingleImpl[T]) Run

func (s *SingleImpl[T]) Run(opts ...Option[T]) Disposed

Run creates an observer without consuming the emitted items.

type Supplier

type Supplier[T any] func(ctx context.Context) Item[T]

Supplier defines a function that supplies a result from nothing.

type TimestampItem

type TimestampItem[T any] struct {
	Timestamp time.Time
	V         T
}

TimestampItem attach a timestamp to an item.

type TryError

type TryError[T any] struct {
	// contains filtered or unexported fields
}

func (TryError[T]) Error

func (e TryError[T]) Error() string

type Unmarshaller

type Unmarshaller[T any] func([]byte, T) error

Unmarshaller defines an unmarshaller type ([]byte to interface).

type WhilstFunc

type WhilstFunc[T any] func(current T) bool

WhilstFunc condition function as used by Range

func Count

func Count[T Numeric](count T) WhilstFunc[T]

func LessThan

func LessThan[T Numeric](until T) WhilstFunc[T]

func LessThanPF

func LessThanPF[T ProxyField[T, O], O Numeric](until T) WhilstFunc[T]

func MoreThan

func MoreThan[T Numeric](until T) WhilstFunc[T]

func MoreThanPF

func MoreThanPF[T ProxyField[T, O], O Numeric](until T) WhilstFunc[T]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL