Documentation ¶
Index ¶
- Variables
- func Assert[T any](ctx context.Context, iterable Iterable[T], asserters ...Asserter[T])
- func Just[T any](values ...T) func(opts ...Option[T]) Observable[T]
- func JustError[T any](err error) func(opts ...Option[T]) Single[T]
- func JustSingle[T any](value T, opts ...Option[T]) func(opts ...Option[T]) Single[T]
- func MaxInitLimitInt() int
- func MinInitLimitInt() int
- func MustBool[T any](item Item[T]) bool
- func MustOpaque[T, O any](item Item[T]) O
- func MustTV[T any](item Item[T]) int
- func MustWCh[T any](item Item[T]) chan<- Item[T]
- func NativeItemLimitComparator[T constraints.Ordered](a, b Item[T]) int
- func SendItems[T any](ctx context.Context, ch chan<- Item[T], strategy enums.CloseChannelStrategy, ...)
- func TryBool[T any](item Item[T]) (bool, error)
- func TryOpaque[T, O any](item Item[T]) (O, error)
- func TryTV[T any](item Item[T]) (int, error)
- func TryWCh[T any](item Item[T]) (chan<- Item[T], error)
- type AssertFunc
- type AssertPredicate
- type AssertResources
- type Asserter
- type BadRangeIteratorError
- type Calculator
- type Comparator
- type CompletedFunc
- type ContainItems
- type CustomPredicate
- type Disposable
- type Disposed
- type DistributionFunc
- type Duration
- type DynamicDistributionFunc
- type Envelope
- type ErrFunc
- type ErrorFunc
- type ErrorToObservable
- type Func
- type Func2
- type FuncN
- type GroupedObservable
- type HasAnError
- type HasError
- type HasFalse
- type HasItem
- type HasItems
- type HasItemsNoOrder
- type HasNoError
- type HasNumber
- type HasNumbers
- type HasNumbersNoOrder
- type HasTickCount
- type HasTickValue
- type HasTickValueCount
- type HasTicks
- type HasTrue
- type IllegalInputError
- type IndexOutOfBoundError
- type InitLimit
- type IsEmpty
- type IsFalse
- type IsNotEmpty
- type IsTrue
- type IsZero
- type Item
- func Bool[T any](b bool) Item[T]
- func Error[T any](err error) Item[T]
- func False[T any]() Item[T]
- func MaxItemInitLimitInt() Item[int]
- func MinItemInitLimitInt() Item[int]
- func Num[T any](n NumVal) Item[T]
- func Of[T any](v T) Item[T]
- func Opaque[T any](o any) Item[T]
- func TV[T any](tv int) Item[T]
- func Tick[T any]() Item[T]
- func True[T any]() Item[T]
- func WCh[T any](ch any) Item[T]
- func Zero[T any]() Item[T]
- func (it Item[T]) Bool() bool
- func (it Item[T]) Ch() chan<- Item[T]
- func (it Item[T]) Desc() string
- func (it Item[T]) Disc() enums.ItemDiscriminator
- func (it Item[T]) IsBoolean() bool
- func (it Item[T]) IsCh() bool
- func (it Item[T]) IsError() bool
- func (it Item[T]) IsNumber() bool
- func (it Item[T]) IsOpaque() bool
- func (it Item[T]) IsTick() bool
- func (it Item[T]) IsTickValue() bool
- func (it Item[T]) IsValue() bool
- func (it Item[T]) Num() NumVal
- func (it Item[T]) Opaque() any
- func (it Item[T]) SendBlocking(ch chan<- Item[T])
- func (it Item[T]) SendContext(ctx context.Context, ch chan<- Item[T]) bool
- func (it Item[T]) SendNonBlocking(ch chan<- Item[T]) bool
- func (it Item[T]) TV() int
- type ItemToObservable
- type Iterable
- type Marshaller
- type MissingCalcError
- type NextFunc
- type NumVal
- type Numeric
- type NumericCalc
- type NumericRangeIterator
- type Observable
- func Amb[T any](observables []Observable[T], opts ...Option[T]) Observable[T]
- func CombineLatest[T any](f FuncN[T], observables []Observable[T], opts ...Option[T]) Observable[T]
- func Concat[T any](observables []Observable[T], opts ...Option[T]) Observable[T]
- func Create[T any](f []Producer[T], opts ...Option[T]) Observable[T]
- func Defer[T any](f []Producer[T], opts ...Option[T]) Observable[T]
- func Empty[T any]() Observable[T]
- func FromChannel[T any](next <-chan Item[T], opts ...Option[T]) Observable[T]
- func FromEventSource[T any](next <-chan Item[T], opts ...Option[T]) Observable[T]
- func Interval[T any](interval Duration, opts ...Option[T]) Observable[T]
- func Merge[T any](observables []Observable[T], opts ...Option[T]) Observable[T]
- func Never[T any]() Observable[T]
- func Range[T Numeric](iterator RangeIterator[T], opts ...Option[T]) Observable[T]
- func RangePF[T ProxyField[T, O], O Numeric](iterator RangeIteratorPF[T, O], opts ...Option[T]) Observable[T]
- func Start[T any](fs []Supplier[T], opts ...Option[T]) Observable[T]
- func Thrown[T any](err error) Observable[T]
- func Timer[T any](d Duration, opts ...Option[T]) Observable[T]
- type ObservableImpl
- func (o *ObservableImpl[T]) All(predicate Predicate[T], opts ...Option[T]) Single[T]
- func (o *ObservableImpl[T]) Average(opts ...Option[T]) Single[T]
- func (o *ObservableImpl[T]) BackOffRetry(backOffCfg backoff.BackOff, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Connect(ctx context.Context) (context.Context, Disposable)
- func (o *ObservableImpl[T]) Contains(equal Predicate[T], opts ...Option[T]) Single[T]
- func (o *ObservableImpl[T]) Count(opts ...Option[T]) Single[T]
- func (o *ObservableImpl[T]) DefaultIfEmpty(defaultValue T, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Distinct(apply Func[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) DistinctUntilChanged(apply Func[T], comparator Comparator[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) DoOnCompleted(completedFunc CompletedFunc, opts ...Option[T]) Disposed
- func (o *ObservableImpl[T]) DoOnError(errFunc ErrFunc, opts ...Option[T]) Disposed
- func (o *ObservableImpl[T]) DoOnNext(nextFunc NextFunc[T], opts ...Option[T]) Disposed
- func (o *ObservableImpl[T]) ElementAt(index uint, opts ...Option[T]) Single[T]
- func (o *ObservableImpl[T]) Error(opts ...Option[T]) error
- func (o *ObservableImpl[T]) Errors(opts ...Option[T]) []error
- func (o *ObservableImpl[T]) Filter(apply Predicate[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Find(find Predicate[T], opts ...Option[T]) OptionalSingle[T]
- func (o *ObservableImpl[T]) First(opts ...Option[T]) OptionalSingle[T]
- func (o *ObservableImpl[T]) FirstOrDefault(defaultValue T, opts ...Option[T]) Single[T]
- func (o *ObservableImpl[T]) FlatMap(apply ItemToObservable[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) ForEach(nextFunc NextFunc[T], errFunc ErrFunc, completedFunc CompletedFunc, ...) Disposed
- func (o *ObservableImpl[T]) GroupBy(length int, distribution DistributionFunc[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) GroupByDynamic(distribution DynamicDistributionFunc[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) IgnoreElements(opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Last(opts ...Option[T]) OptionalSingle[T]
- func (o *ObservableImpl[T]) LastOrDefault(defaultValue T, opts ...Option[T]) Single[T]
- func (o *ObservableImpl[T]) Map(apply Func[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
- func (o *ObservableImpl[T]) Min(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
- func (o *ObservableImpl[T]) Observe(opts ...Option[T]) <-chan Item[T]
- func (o *ObservableImpl[T]) OnErrorResumeNext(resumeSequence ErrorToObservable[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) OnErrorReturn(resumeFunc ErrorFunc[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) OnErrorReturnItem(resume T, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Reduce(apply Func2[T], opts ...Option[T]) OptionalSingle[T]
- func (o *ObservableImpl[T]) Repeat(count int64, frequency Duration, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Retry(count int, shouldRetry ShouldRetryFunc, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Run(opts ...Option[T]) Disposed
- func (o *ObservableImpl[T]) Sample(iterable Iterable[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Scan(apply Func2[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Send(output chan<- Item[T], opts ...Option[T])
- func (o *ObservableImpl[T]) SequenceEqual(iterable Iterable[T], comparator Comparator[T], opts ...Option[T]) Single[T]
- func (o *ObservableImpl[T]) Serialize(from int, identifier func(any) int, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Skip(nth uint, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) SkipLast(nth uint, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) SkipWhile(apply Predicate[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) StartWith(iterable Iterable[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Sum(opts ...Option[T]) OptionalSingle[T]
- func (o *ObservableImpl[T]) Take(nth uint, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) TakeLast(nth uint, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) TakeUntil(apply Predicate[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) TakeWhile(apply Predicate[T], opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) TimeInterval(opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) Timestamp(opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error)
- func (o *ObservableImpl[T]) WindowWithCount(count int, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) WindowWithTime(timespan Duration, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) WindowWithTimeOrCount(timespan Duration, count int, opts ...Option[T]) Observable[T]
- func (o *ObservableImpl[T]) ZipFromIterable(iterable Iterable[T], zipper Func2[T], opts ...Option[T]) Observable[T]
- type Option
- func Serialize[T any](identifier func(T) int) Option[T]
- func WithBackPressureStrategy[T any](strategy enums.BackPressureStrategy) Option[T]
- func WithBufferedChannel[T any](capacity int) Option[T]
- func WithCPUPool[T any]() Option[T]
- func WithCalc[T any](calculator Calculator[T]) Option[T]
- func WithContext[T any](ctx context.Context) Option[T]
- func WithErrorStrategy[T any](strategy enums.OnErrorStrategy) Option[T]
- func WithObservationStrategy[T any](strategy enums.ObservationStrategy) Option[T]
- func WithPool[T any](pool int) Option[T]
- func WithPublishStrategy[T any]() Option[T]
- type OptionalSingle
- type OptionalSingleImpl
- type Predicate
- type Producer
- type ProxyField
- type RangeIterator
- type RangeIteratorByProxy
- type RangeIteratorPF
- type ShouldRetryFunc
- type Single
- type SingleImpl
- func (s *SingleImpl[T]) Filter(apply Predicate[T], opts ...Option[T]) OptionalSingle[T]
- func (s *SingleImpl[T]) Get(opts ...Option[T]) (Item[T], error)
- func (s *SingleImpl[T]) Map(apply Func[T], opts ...Option[T]) Single[T]
- func (s *SingleImpl[T]) Observe(opts ...Option[T]) <-chan Item[T]
- func (s *SingleImpl[T]) Run(opts ...Option[T]) Disposed
- type Supplier
- type TimestampItem
- type TryError
- type Unmarshaller
- type WhilstFunc
Constants ¶
This section is empty.
Variables ¶
var Infinite int64 = -1
Infinite represents an infinite wait time
Functions ¶
func Just ¶
func Just[T any](values ...T) func(opts ...Option[T]) Observable[T]
Just creates an Observable with the provided items.
func JustSingle ¶
JustSingle is like JustItem in that it is defined for a single item iterable but behaves like Just in that it returns a func. This is probably not required, just defined for experimental purposes for now.
func MaxInitLimitInt ¶
func MaxInitLimitInt() int
func MinInitLimitInt ¶
func MinInitLimitInt() int
func MustBool ¶
MustBool is a helper function that assists the client in interpreting a boolean item received via a channel. If the item it not a boolean, a panic is raised, otherwise the returned value is the underlying boolean value.
func MustOpaque ¶
MustOpaque is a helper function that assists the client in interpreting an opaque item received via a channel. If the item it not of the custom type O, a panic is raised, otherwise the returned value is the underlying custom value.
func MustTV ¶
TryTV is a helper function that assists the client in interpreting an integer based tick value item received via a channel. If the item it not a tick value, a panic is raised, otherwise the returned value is the underlying integer value.
func MustWCh ¶
MustWCh is a helper function that assists the client in interpreting a write chan item received via a channel. If the item it not a write chan, a panic is raised, otherwise the returned value is the underlying write chan value.
func NativeItemLimitComparator ¶
func NativeItemLimitComparator[T constraints.Ordered](a, b Item[T]) int
func SendItems ¶
func SendItems[T any](ctx context.Context, ch chan<- Item[T], strategy enums.CloseChannelStrategy, items ...any, )
SendItems is a utility function that sends a list of items and indicates a strategy on whether to close the channel once the function completes.
func TryBool ¶
TryBool is a helper function that assists the client in interpreting a boolean item received via a channel. If the item it not a boolean, an error is returned, otherwise the returned value is the underlying boolean value.
func TryOpaque ¶
TryOpaque is a helper function that assists the client in interpreting an opaque item received via a channel. If the item it not of the custom type O, an error is returned, otherwise the returned value is the underlying custom value.
func TryTV ¶
TryTV is a helper function that assists the client in interpreting an integer based tick value item received via a channel. If the item it not a tick value, an error is returned, otherwise the returned value is the underlying integer value.
Types ¶
type AssertFunc ¶
type AssertFunc[T any] func(actual AssertResources[T])
func (AssertFunc[T]) Check ¶
func (f AssertFunc[T]) Check(actual AssertResources[T])
type AssertPredicate ¶
type AssertPredicate[T any] func(actual AssertResources[T]) error
AssertPredicate is a custom predicate based on the items.
type AssertResources ¶
type BadRangeIteratorError ¶
type BadRangeIteratorError struct {
// contains filtered or unexported fields
}
var RangeMissingWhilstError BadRangeIteratorError
func (BadRangeIteratorError) Error ¶
func (e BadRangeIteratorError) Error() string
type Calculator ¶
Calculator defines numeric operations for T
type Comparator ¶
Comparator defines a func that returns an int: - 0 if two elements are equals - A negative value if the first argument is less than the second - A positive value if the first argument is greater than the second
type ContainItems ¶
type ContainItems[T any] struct { Expected []T }
ContainItems
func (ContainItems[T]) Check ¶
func (a ContainItems[T]) Check(actual AssertResources[T])
HasItems checks if an observable contains expected items
type CustomPredicate ¶
type CustomPredicate[T any] struct { Expected AssertPredicate[T] }
CustomPredicate
func (CustomPredicate[T]) Check ¶
func (a CustomPredicate[T]) Check(actual AssertResources[T])
Check CustomPredicateAssert checks a custom predicate.
type Disposable ¶
type Disposable context.CancelFunc
Disposable is a function to be called in order to dispose a subscription.
type Disposed ¶
type Disposed <-chan struct{}
Disposed is a notification channel indicating when an Observable is closed.
type DistributionFunc ¶
DistributionFunc used by GroupBy
type Duration ¶
type Duration interface {
// contains filtered or unexported methods
}
Duration represents a duration
type DynamicDistributionFunc ¶
DistributionFunc used by GroupByDynamic
type Envelope ¶ added in v0.5.2
type Envelope[T any, O Numeric] struct { T *T P O }
Envelope wraps a struct type T so that it can be defined with pointer receivers. It also means that the client does not need to manually defined methods for constraint ProxyField as they are implemented by the Envelope.
func (Envelope[T, O]) Field ¶ added in v0.5.2
func (e Envelope[T, O]) Field() O
Field nominates which member of T of type O is the proxy field required to satisfy constraint ProxyField.
type ErrorToObservable ¶
type ErrorToObservable[T any] func(error) Observable[T]
ErrorToObservable defines a function that transforms an observable from an error.
type FuncN ¶
type FuncN[T any] func(...T) T
FuncIntM defines a function that's specialised for Map To solve the problem of being able to map values across different types, the FuncIntM type will be modified to take an extra type parameter 'O' which represents the 'Other' type, ie we map from a value of type 'T' to a value of type 'O' (Func[T, O any]). With this in place, we should be able to define a pipeline that starts off with values of type T, and end up with values of type O via a Map operator. We'll have to make sure that any intermediate channels can be appropriately defined. If we want to map between values within the same type, we need to use a separate definition, and in fact the original definition Func, should suffice.
The problem with Map is that it introduces a new Type. Perhaps we need a separate observer interface that can bridge from T to O. We can't introduce the type O, because that would have a horrendous cascading impact on every type, when most operations would not need this type O. With generics, Map is a very awkward operator that needs special attention. In the short term, what we can say is that the base functionality only allows mapping to different values within the same type. FuncN defines a function that computes a value from N input values.
type GroupedObservable ¶
type GroupedObservable[T any] struct { Observable[T] // Key is the distribution key Key string }
GroupedObservable is the observable type emitted by the GroupByDynamic operator.
type HasAnError ¶
HasAnError
func (HasAnError[T]) Check ¶
func (a HasAnError[T]) Check(actual AssertResources[T])
Check HasAnError ensures that the observable has produced a specific error.
type HasError ¶
HasError
func (HasError[T]) Check ¶
func (a HasError[T]) Check(actual AssertResources[T])
type HasFalse ¶
type HasFalse[T any] struct { }
HasFalse
func (HasFalse[T]) Check ¶
func (a HasFalse[T]) Check(actual AssertResources[T])
Check HasFalse checks boolean values contains at least 1 true false
type HasItem ¶
type HasItem[T any] struct { Expected T }
HasItem
func (HasItem[T]) Check ¶
func (a HasItem[T]) Check(actual AssertResources[T])
HasItem checks if a single or optional single has a specific item.
type HasItems ¶
type HasItems[T any] struct { Expected []T }
HasItems
func (HasItems[T]) Check ¶
func (a HasItems[T]) Check(actual AssertResources[T])
HasItems checks if an observable has an exact set of items.
type HasItemsNoOrder ¶
type HasItemsNoOrder[T any] struct { Expected []T }
HasItemsNoOrder
func (HasItemsNoOrder[T]) Check ¶
func (a HasItemsNoOrder[T]) Check(actual AssertResources[T])
Check ensures that an observable produces the corresponding items regardless of the order.
type HasNoError ¶
type HasNoError[T any] struct { }
HasNoError
func (HasNoError[T]) Check ¶
func (a HasNoError[T]) Check(actual AssertResources[T])
Check HasNoError ensures that the observable has not produced an error.
type HasNumber ¶
HasNumber
func (HasNumber[T]) Check ¶
func (a HasNumber[T]) Check(actual AssertResources[T])
HasNumber checks if a single or optional single has a specific numeric item.
type HasNumbers ¶
type HasNumbers[T any] struct { Expected []T }
HasNumbers
func (HasNumbers[T]) Check ¶
func (a HasNumbers[T]) Check(actual AssertResources[T])
HasNumbers checks if an observable has an exact set of numeric items.
type HasNumbersNoOrder ¶
type HasNumbersNoOrder[T any] struct { Expected []T }
HasNumbersNoOrder
func (HasNumbersNoOrder[T]) Check ¶
func (a HasNumbersNoOrder[T]) Check(actual AssertResources[T])
Check ensures that an observable produces the corresponding numbers regardless of the order.
type HasTickCount ¶
HasTickCount
func (HasTickCount[T]) Check ¶
func (a HasTickCount[T]) Check(actual AssertResources[T])
HasTickValues checks if an observable has a expect count of tick value items.
type HasTickValue ¶
HasTickValue
func (HasTickValue[T]) Check ¶
func (a HasTickValue[T]) Check(actual AssertResources[T])
HasTickValue checks if a single or optional single has a specific numeric item.
type HasTickValueCount ¶
HasTickValueCount
func (HasTickValueCount[T]) Check ¶
func (a HasTickValueCount[T]) Check(actual AssertResources[T])
HasTickValues checks if an observable has a expect count of tick value items.
type HasTicks ¶
type HasTicks[T any] struct { Expected []T }
HasTicks
func (HasTicks[T]) Check ¶
func (a HasTicks[T]) Check(actual AssertResources[T])
HasTickValues checks if an observable has an exact set of numeric items.
type HasTrue ¶
type HasTrue[T any] struct { }
HasTrue
func (HasTrue[T]) Check ¶
func (a HasTrue[T]) Check(actual AssertResources[T])
Check HasTrue checks boolean values contains at least 1 true value
type IllegalInputError ¶
type IllegalInputError struct {
// contains filtered or unexported fields
}
IllegalInputError is triggered when the observable receives an illegal input.
func (IllegalInputError) Error ¶
func (e IllegalInputError) Error() string
type IndexOutOfBoundError ¶
type IndexOutOfBoundError struct {
// contains filtered or unexported fields
}
IndexOutOfBoundError is triggered when the observable cannot access to the specified index.
func (IndexOutOfBoundError) Error ¶
func (e IndexOutOfBoundError) Error() string
type InitLimit ¶
InitLimit defines a function to be used with Min and Max operators that defines a limit initialiser, that is to say, for Max we need to initialise the internal maximum reference point to be minimum value for type T and the reverse for the Min operator.
type IsEmpty ¶
type IsEmpty[T any] struct { }
IsEmpty
func (IsEmpty[T]) Check ¶
func (a IsEmpty[T]) Check(actual AssertResources[T])
type IsFalse ¶
type IsFalse[T any] struct { }
IsFalse
func (IsFalse[T]) Check ¶
func (a IsFalse[T]) Check(actual AssertResources[T])
Check IsFalse checks boolean value is false
type IsNotEmpty ¶
type IsNotEmpty[T any] struct { }
IsNotEmpty
func (IsNotEmpty[T]) Check ¶
func (a IsNotEmpty[T]) Check(actual AssertResources[T])
type IsTrue ¶
type IsTrue[T any] struct { }
IsTrue
func (IsTrue[T]) Check ¶
func (a IsTrue[T]) Check(actual AssertResources[T])
Check IsTrue checks boolean value is true
type Item ¶
Item is a wrapper having either a value, error or an opaque value
func MaxItemInitLimitInt ¶
func MinItemInitLimitInt ¶
func (Item[T]) Disc ¶
func (it Item[T]) Disc() enums.ItemDiscriminator
Disc returns the discriminator of the item
func (Item[T]) IsTickValue ¶
IsTickValue checks if an item is a tick value instance.
func (Item[T]) SendBlocking ¶
SendBlocking sends an item and blocks until it is sent.
func (Item[T]) SendContext ¶
SendContext sends an item and blocks until it is sent or a context canceled. It returns a boolean to indicate whether the item was sent.
func (Item[T]) SendNonBlocking ¶
SendNonBlocking sends an item without blocking. It returns a boolean to indicate whether the item was sent.
type ItemToObservable ¶
type ItemToObservable[T any] func(Item[T]) Observable[T]
ItemToObservable defines a function that computes an observable from an item.
type Marshaller ¶
Marshaller defines a marshaller type (ItemValue[T] to []byte).
type MissingCalcError ¶
type MissingCalcError struct {
// contains filtered or unexported fields
}
MissingCalcError is triggered when if the client forgets to provide a calculator option
func (MissingCalcError) Error ¶
func (e MissingCalcError) Error() string
type NumVal ¶
type NumVal = int
NumVal is an integer value used by Item.N and Range
type Numeric ¶
type Numeric interface { constraints.Integer | constraints.Signed | constraints.Unsigned | constraints.Float }
Numeric defines a constraint that targets scalar types for whom numeric operators are natively defined.
type NumericCalc ¶
type NumericCalc[T Numeric] struct {
// contains filtered or unexported fields
}
NumericCalc is a predefine calculator for any numeric type
func (*NumericCalc[T]) Add ¶
func (c *NumericCalc[T]) Add(a, b T) T
func (*NumericCalc[T]) Div ¶
func (c *NumericCalc[T]) Div(a, b T) T
func (*NumericCalc[T]) Inc ¶
func (c *NumericCalc[T]) Inc(v T) T
func (*NumericCalc[T]) IsZero ¶
func (c *NumericCalc[T]) IsZero(v T) bool
func (*NumericCalc[T]) Zero ¶
func (c *NumericCalc[T]) Zero() T
type NumericRangeIterator ¶
type NumericRangeIterator[T Numeric] struct { StartAt T By T Whilst WhilstFunc[T] // contains filtered or unexported fields }
func (*NumericRangeIterator[T]) Increment ¶
func (i *NumericRangeIterator[T]) Increment(index *T) T
Increment increments the index value
func (*NumericRangeIterator[T]) Init ¶
func (i *NumericRangeIterator[T]) Init() error
func (*NumericRangeIterator[T]) Start ¶
func (i *NumericRangeIterator[T]) Start() (*T, error)
Start should return the initial index value. If the By value has not been set, it will default to 1.
func (*NumericRangeIterator[T]) Step ¶
func (i *NumericRangeIterator[T]) Step() T
func (*NumericRangeIterator[T]) While ¶
func (i *NumericRangeIterator[T]) While(current T) bool
While defines a condition that must be true for the loop to continue iterating.
type Observable ¶
type Observable[T any] interface { Iterable[T] All(predicate Predicate[T], opts ...Option[T]) Single[T] Average(opts ...Option[T]) Single[T] BackOffRetry(backOffCfg backoff.BackOff, opts ...Option[T]) Observable[T] Connect(ctx context.Context) (context.Context, Disposable) Contains(equal Predicate[T], opts ...Option[T]) Single[T] Count(opts ...Option[T]) Single[T] DefaultIfEmpty(defaultValue T, opts ...Option[T]) Observable[T] Distinct(apply Func[T], opts ...Option[T]) Observable[T] DistinctUntilChanged(apply Func[T], comparator Comparator[T], opts ...Option[T]) Observable[T] DoOnCompleted(completedFunc CompletedFunc, opts ...Option[T]) Disposed DoOnError(errFunc ErrFunc, opts ...Option[T]) Disposed DoOnNext(nextFunc NextFunc[T], opts ...Option[T]) Disposed ElementAt(index uint, opts ...Option[T]) Single[T] Error(opts ...Option[T]) error Errors(opts ...Option[T]) []error Filter(apply Predicate[T], opts ...Option[T]) Observable[T] Find(find Predicate[T], opts ...Option[T]) OptionalSingle[T] First(opts ...Option[T]) OptionalSingle[T] FirstOrDefault(defaultValue T, opts ...Option[T]) Single[T] FlatMap(apply ItemToObservable[T], opts ...Option[T]) Observable[T] ForEach(nextFunc NextFunc[T], errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option[T]) Disposed GroupBy(length int, distribution DistributionFunc[T], opts ...Option[T]) Observable[T] GroupByDynamic(distribution DynamicDistributionFunc[T], opts ...Option[T]) Observable[T] IgnoreElements(opts ...Option[T]) Observable[T] Last(opts ...Option[T]) OptionalSingle[T] LastOrDefault(defaultValue T, opts ...Option[T]) Single[T] Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T] Map(apply Func[T], opts ...Option[T]) Observable[T] Min(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T] OnErrorResumeNext(resumeSequence ErrorToObservable[T], opts ...Option[T]) Observable[T] OnErrorReturn(resumeFunc ErrorFunc[T], opts ...Option[T]) Observable[T] OnErrorReturnItem(resume T, opts ...Option[T]) Observable[T] Reduce(apply Func2[T], opts ...Option[T]) OptionalSingle[T] Repeat(count int64, frequency Duration, opts ...Option[T]) Observable[T] Retry(count int, shouldRetry ShouldRetryFunc, opts ...Option[T]) Observable[T] Run(opts ...Option[T]) Disposed Sample(iterable Iterable[T], opts ...Option[T]) Observable[T] Scan(apply Func2[T], opts ...Option[T]) Observable[T] Send(output chan<- Item[T], opts ...Option[T]) SequenceEqual(iterable Iterable[T], comparator Comparator[T], opts ...Option[T]) Single[T] Serialize(from int, identifier func(any) int, opts ...Option[T]) Observable[T] Skip(nth uint, opts ...Option[T]) Observable[T] SkipLast(nth uint, opts ...Option[T]) Observable[T] SkipWhile(apply Predicate[T], opts ...Option[T]) Observable[T] StartWith(iterable Iterable[T], opts ...Option[T]) Observable[T] Sum(opts ...Option[T]) OptionalSingle[T] Take(nth uint, opts ...Option[T]) Observable[T] TakeLast(nth uint, opts ...Option[T]) Observable[T] TakeUntil(apply Predicate[T], opts ...Option[T]) Observable[T] TakeWhile(apply Predicate[T], opts ...Option[T]) Observable[T] TimeInterval(opts ...Option[T]) Observable[T] Timestamp(opts ...Option[T]) Observable[T] ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error) WindowWithCount(count int, opts ...Option[T]) Observable[T] WindowWithTime(timespan Duration, opts ...Option[T]) Observable[T] WindowWithTimeOrCount(timespan Duration, count int, opts ...Option[T]) Observable[T] ZipFromIterable(iterable Iterable[T], zipper Func2[T], opts ...Option[T]) Observable[T] }
func Amb ¶
func Amb[T any](observables []Observable[T], opts ...Option[T]) Observable[T]
Amb takes several Observables, emit all of the items from only the first of these Observables to emit an item or notification. (What the hell is an Amb, WTF)
func CombineLatest ¶
func CombineLatest[T any](f FuncN[T], observables []Observable[T], opts ...Option[T], ) Observable[T]
CombineLatest combines the latest item emitted by each Observable via a specified function and emit items based on the results of this function. Requires a calculator, so specify this with the WithCalc option.
func Concat ¶
func Concat[T any](observables []Observable[T], opts ...Option[T]) Observable[T]
Concat emits the emissions from two or more Observables without interleaving them.
func Create ¶
func Create[T any](f []Producer[T], opts ...Option[T]) Observable[T]
Create creates an Observable from scratch by calling observer methods programmatically.
func Defer ¶
func Defer[T any](f []Producer[T], opts ...Option[T]) Observable[T]
Defer does not create the Observable until the observer subscribes, and creates a fresh Observable for each observer. This creates a cold observable.
func Empty ¶
func Empty[T any]() Observable[T]
Empty creates an Observable with no item and terminate immediately.
func FromChannel ¶
func FromChannel[T any](next <-chan Item[T], opts ...Option[T]) Observable[T]
FromChannel creates a cold observable from a channel.
func FromEventSource ¶
func FromEventSource[T any](next <-chan Item[T], opts ...Option[T]) Observable[T]
FromEventSource creates a hot observable from a channel.
func Interval ¶
func Interval[T any](interval Duration, opts ...Option[T]) Observable[T]
Interval creates an Observable emitting incremental integers infinitely between each given time interval.
func Merge ¶
func Merge[T any](observables []Observable[T], opts ...Option[T]) Observable[T]
Merge combines multiple Observables into one by merging their emissions
func Never ¶
func Never[T any]() Observable[T]
Never creates an Observable that emits no items and does not terminate.
func Range ¶
func Range[T Numeric](iterator RangeIterator[T], opts ...Option[T]) Observable[T]
Range creates an Observable that emits count sequential integers beginning at start.
func RangePF ¶ added in v0.5.2
func RangePF[T ProxyField[T, O], O Numeric](iterator RangeIteratorPF[T, O], opts ...Option[T], ) Observable[T]
RangePF creates an Observable that emits count sequential integers beginning at start, for non numeric types, which do contain a nominated proxy Numeric member
func Start ¶
func Start[T any](fs []Supplier[T], opts ...Option[T]) Observable[T]
Start creates an Observable from one or more directive-like Supplier and emits the result of each operation asynchronously on a new Observable.
func Thrown ¶
func Thrown[T any](err error) Observable[T]
Thrown creates an Observable that emits no items and terminates with an error.
type ObservableImpl ¶
type ObservableImpl[T any] struct { // contains filtered or unexported fields }
ObservableImpl implements Observable.
func (*ObservableImpl[T]) All ¶
func (o *ObservableImpl[T]) All(predicate Predicate[T], opts ...Option[T]) Single[T]
All determines whether all items emitted by an Observable meet some criteria.
func (*ObservableImpl[T]) Average ¶
func (o *ObservableImpl[T]) Average(opts ...Option[T], ) Single[T]
Average calculates the average of numbers emitted by an Observable and emits the result. Requires a calculator, so specify this with the WithCalc option.
func (*ObservableImpl[T]) BackOffRetry ¶
func (o *ObservableImpl[T]) BackOffRetry(backOffCfg backoff.BackOff, opts ...Option[T], ) Observable[T]
BackOffRetry implements a backoff retry if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error. Cannot be run in parallel.
func (*ObservableImpl[T]) Connect ¶
func (o *ObservableImpl[T]) Connect(ctx context.Context) (context.Context, Disposable)
Connect instructs a connectable Observable to begin emitting items to its subscribers.
func (*ObservableImpl[T]) Contains ¶
func (o *ObservableImpl[T]) Contains(equal Predicate[T], opts ...Option[T]) Single[T]
func (*ObservableImpl[T]) Count ¶
func (o *ObservableImpl[T]) Count(opts ...Option[T]) Single[T]
Count counts the number of items emitted by the source Observable and emit only this value.
func (*ObservableImpl[T]) DefaultIfEmpty ¶
func (o *ObservableImpl[T]) DefaultIfEmpty(defaultValue T, opts ...Option[T]) Observable[T]
DefaultIfEmpty returns an Observable that emits the items emitted by the source Observable or a specified default item if the source Observable is empty.
func (*ObservableImpl[T]) Distinct ¶
func (o *ObservableImpl[T]) Distinct(apply Func[T], opts ...Option[T]) Observable[T]
Distinct suppresses duplicate items in the original Observable and returns a new Observable.
func (*ObservableImpl[T]) DistinctUntilChanged ¶
func (o *ObservableImpl[T]) DistinctUntilChanged(apply Func[T], comparator Comparator[T], opts ...Option[T], ) Observable[T]
DistinctUntilChanged suppresses consecutive duplicate items in the original Observable. Cannot be run in parallel.
func (*ObservableImpl[T]) DoOnCompleted ¶
func (o *ObservableImpl[T]) DoOnCompleted(completedFunc CompletedFunc, opts ...Option[T], ) Disposed
DoOnCompleted registers a callback action that will be called once the Observable terminates.
func (*ObservableImpl[T]) DoOnError ¶
func (o *ObservableImpl[T]) DoOnError(errFunc ErrFunc, opts ...Option[T]) Disposed
DoOnError registers a callback action that will be called if the Observable terminates abnormally.
func (*ObservableImpl[T]) DoOnNext ¶
func (o *ObservableImpl[T]) DoOnNext(nextFunc NextFunc[T], opts ...Option[T]) Disposed
DoOnNext registers a callback action that will be called on each item emitted by the Observable.
func (*ObservableImpl[T]) ElementAt ¶
func (o *ObservableImpl[T]) ElementAt(index uint, opts ...Option[T]) Single[T]
ElementAt emits only item n emitted by an Observable. Cannot be run in parallel.
func (*ObservableImpl[T]) Error ¶
func (o *ObservableImpl[T]) Error(opts ...Option[T]) error
Error returns the eventual Observable error. This method is blocking.
func (*ObservableImpl[T]) Errors ¶
func (o *ObservableImpl[T]) Errors(opts ...Option[T]) []error
Errors returns an eventual list of Observable errors. This method is blocking
func (*ObservableImpl[T]) Filter ¶
func (o *ObservableImpl[T]) Filter(apply Predicate[T], opts ...Option[T]) Observable[T]
Filter emits only those items from an Observable that pass a predicate test.
func (*ObservableImpl[T]) Find ¶
func (o *ObservableImpl[T]) Find(find Predicate[T], opts ...Option[T]) OptionalSingle[T]
Find emits the first item passing a predicate then complete.
func (*ObservableImpl[T]) First ¶
func (o *ObservableImpl[T]) First(opts ...Option[T]) OptionalSingle[T]
First returns new Observable which emit only first item. Cannot be run in parallel.
func (*ObservableImpl[T]) FirstOrDefault ¶
func (o *ObservableImpl[T]) FirstOrDefault(defaultValue T, opts ...Option[T]) Single[T]
FirstOrDefault returns new Observable which emit only first item. If the observable fails to emit any items, it emits a default value. Cannot be run in parallel.
func (*ObservableImpl[T]) FlatMap ¶
func (o *ObservableImpl[T]) FlatMap(apply ItemToObservable[T], opts ...Option[T], ) Observable[T]
FlatMap transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable.
func (*ObservableImpl[T]) ForEach ¶
func (o *ObservableImpl[T]) ForEach(nextFunc NextFunc[T], errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option[T], ) Disposed
ForEach subscribes to the Observable and receives notifications for each element.
func (*ObservableImpl[T]) GroupBy ¶
func (o *ObservableImpl[T]) GroupBy(length int, distribution DistributionFunc[T], opts ...Option[T], ) Observable[T]
GroupBy divides an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key.
func (*ObservableImpl[T]) GroupByDynamic ¶
func (o *ObservableImpl[T]) GroupByDynamic(distribution DynamicDistributionFunc[T], opts ...Option[T], ) Observable[T]
GroupByDynamic divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.
func (*ObservableImpl[T]) IgnoreElements ¶
func (o *ObservableImpl[T]) IgnoreElements(opts ...Option[T]) Observable[T]
IgnoreElements ignores all items emitted by the source ObservableSource except for the errors. Cannot be run in parallel.
func (*ObservableImpl[T]) Last ¶
func (o *ObservableImpl[T]) Last(opts ...Option[T]) OptionalSingle[T]
Last returns a new Observable which emit only last item. Cannot be run in parallel.
func (*ObservableImpl[T]) LastOrDefault ¶
func (o *ObservableImpl[T]) LastOrDefault(defaultValue T, opts ...Option[T]) Single[T]
func (*ObservableImpl[T]) Map ¶
func (o *ObservableImpl[T]) Map(apply Func[T], opts ...Option[T]) Observable[T]
Map transforms the items emitted by an Observable by applying a function to each item.
func (*ObservableImpl[T]) Max ¶
func (o *ObservableImpl[T]) Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T], ) OptionalSingle[T]
Max determines and emits the maximum-valued item emitted by an Observable according to a comparator.
func (*ObservableImpl[T]) Min ¶
func (o *ObservableImpl[T]) Min(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T], ) OptionalSingle[T]
Min determines and emits the minimum-valued item emitted by an Observable according to a comparator.
func (*ObservableImpl[T]) Observe ¶
func (o *ObservableImpl[T]) Observe(opts ...Option[T]) <-chan Item[T]
func (*ObservableImpl[T]) OnErrorResumeNext ¶
func (o *ObservableImpl[T]) OnErrorResumeNext(resumeSequence ErrorToObservable[T], opts ...Option[T], ) Observable[T]
OnErrorResumeNext instructs an Observable to pass control to another Observable rather than invoking onError if it encounters an error.
func (*ObservableImpl[T]) OnErrorReturn ¶
func (o *ObservableImpl[T]) OnErrorReturn(resumeFunc ErrorFunc[T], opts ...Option[T]) Observable[T]
func (*ObservableImpl[T]) OnErrorReturnItem ¶
func (o *ObservableImpl[T]) OnErrorReturnItem(resume T, opts ...Option[T]) Observable[T]
func (*ObservableImpl[T]) Reduce ¶
func (o *ObservableImpl[T]) Reduce(apply Func2[T], opts ...Option[T]) OptionalSingle[T]
Reduce applies a function to each item emitted by an Observable, sequentially, and emit the final value.
func (*ObservableImpl[T]) Repeat ¶
func (o *ObservableImpl[T]) Repeat(count int64, frequency Duration, opts ...Option[T]) Observable[T]
Repeat returns an Observable that repeats the sequence of items emitted by the source Observable at most count times, at a particular frequency. Cannot run in parallel.
func (*ObservableImpl[T]) Retry ¶
func (o *ObservableImpl[T]) Retry(count int, shouldRetry ShouldRetryFunc, opts ...Option[T], ) Observable[T]
Retry retries if a source Observable sends an error, resubscribe to it in the hopes that it will complete without error. Cannot be run in parallel.
func (*ObservableImpl[T]) Run ¶
func (o *ObservableImpl[T]) Run(opts ...Option[T]) Disposed
Run creates an Observer without consuming the emitted items.
func (*ObservableImpl[T]) Sample ¶
func (o *ObservableImpl[T]) Sample(iterable Iterable[T], opts ...Option[T]) Observable[T]
Sample returns an Observable that emits the most recent items emitted by the source Iterable whenever the input Iterable emits an item.
func (*ObservableImpl[T]) Scan ¶
func (o *ObservableImpl[T]) Scan(apply Func2[T], opts ...Option[T]) Observable[T]
Scan apply a Func2 to each item emitted by an Observable, sequentially, and emit each successive value. Cannot be run in parallel.
func (*ObservableImpl[T]) Send ¶
func (o *ObservableImpl[T]) Send(output chan<- Item[T], opts ...Option[T])
Send sends the items to a given channel.
func (*ObservableImpl[T]) SequenceEqual ¶
func (o *ObservableImpl[T]) SequenceEqual(iterable Iterable[T], comparator Comparator[T], opts ...Option[T], ) Single[T]
SequenceEqual emits true if an Observable and the input Observable emit the same items, in the same order, with the same termination state. Otherwise, it emits false.
func (*ObservableImpl[T]) Serialize ¶
func (o *ObservableImpl[T]) Serialize(from int, identifier func(any) int, opts ...Option[T], ) Observable[T]
Serialize forces an Observable to make serialized calls and to be well-behaved.
func (*ObservableImpl[T]) Skip ¶
func (o *ObservableImpl[T]) Skip(nth uint, opts ...Option[T]) Observable[T]
Skip suppresses the first n items in the original Observable and returns a new Observable with the rest items. Cannot be run in parallel.
func (*ObservableImpl[T]) SkipLast ¶
func (o *ObservableImpl[T]) SkipLast(nth uint, opts ...Option[T]) Observable[T]
SkipLast suppresses the last n items in the original Observable and returns a new Observable with the rest items. Cannot be run in parallel.
func (*ObservableImpl[T]) SkipWhile ¶
func (o *ObservableImpl[T]) SkipWhile(apply Predicate[T], opts ...Option[T]) Observable[T]
SkipWhile discard items emitted by an Observable until a specified condition becomes false. Cannot be run in parallel.
func (*ObservableImpl[T]) StartWith ¶
func (o *ObservableImpl[T]) StartWith(iterable Iterable[T], opts ...Option[T]) Observable[T]
StartWith emits a specified Iterable before beginning to emit the items from the source Observable.
func (*ObservableImpl[T]) Sum ¶
func (o *ObservableImpl[T]) Sum(opts ...Option[T]) OptionalSingle[T]
Sum calculates the average emitted by an Observable and emits the result. Requires a calculator, so specify this with the WithCalc option.
func (*ObservableImpl[T]) Take ¶
func (o *ObservableImpl[T]) Take(nth uint, opts ...Option[T]) Observable[T]
Take emits only the first n items emitted by an Observable. Cannot be run in parallel.
func (*ObservableImpl[T]) TakeLast ¶
func (o *ObservableImpl[T]) TakeLast(nth uint, opts ...Option[T]) Observable[T]
TakeLast emits only the last n items emitted by an Observable. Cannot be run in parallel.
func (*ObservableImpl[T]) TakeUntil ¶
func (o *ObservableImpl[T]) TakeUntil(apply Predicate[T], opts ...Option[T], ) Observable[T]
TakeUntil returns an Observable that emits items emitted by the source Observable, checks the specified predicate for each item, and then completes when the condition is satisfied. Cannot be run in parallel.
func (*ObservableImpl[T]) TakeWhile ¶
func (o *ObservableImpl[T]) TakeWhile(apply Predicate[T], opts ...Option[T], ) Observable[T]
TakeWhile returns an Observable that emits items emitted by the source ObservableSource so long as each item satisfied a specified condition, and then completes as soon as this condition is not satisfied. Cannot be run in parallel.
func (*ObservableImpl[T]) TimeInterval ¶
func (o *ObservableImpl[T]) TimeInterval(opts ...Option[T]) Observable[T]
func (*ObservableImpl[T]) Timestamp ¶
func (o *ObservableImpl[T]) Timestamp(opts ...Option[T]) Observable[T]
func (*ObservableImpl[T]) ToSlice ¶
func (o *ObservableImpl[T]) ToSlice(initialCapacity int, opts ...Option[T]) ([]Item[T], error)
ToSlice collects all items from an Observable and emit them in a slice and an optional error. Cannot be run in parallel.
func (*ObservableImpl[T]) WindowWithCount ¶
func (o *ObservableImpl[T]) WindowWithCount(count int, opts ...Option[T]) Observable[T]
WindowWithCount periodically subdivides items from an Observable into Observable windows of a given size and emit these windows rather than emitting the items one at a time.
func (*ObservableImpl[T]) WindowWithTime ¶
func (o *ObservableImpl[T]) WindowWithTime(timespan Duration, opts ...Option[T]) Observable[T]
WindowWithTime periodically subdivides items from an Observable into Observables based on timed windows and emit them rather than emitting the items one at a time.
func (*ObservableImpl[T]) WindowWithTimeOrCount ¶
func (o *ObservableImpl[T]) WindowWithTimeOrCount(timespan Duration, count int, opts ...Option[T], ) Observable[T]
WindowWithTimeOrCount periodically subdivides items from an Observable into Observables based on timed windows or a specific size and emit them rather than emitting the items one at a time.
func (*ObservableImpl[T]) ZipFromIterable ¶
func (o *ObservableImpl[T]) ZipFromIterable(iterable Iterable[T], zipper Func2[T], opts ...Option[T], ) Observable[T]
ZipFromIterable merges the emissions of an Iterable via a specified function and emit single items for each combination based on the results of this function.
type Option ¶
type Option[T any] interface { // contains filtered or unexported methods }
func WithBackPressureStrategy ¶
func WithBackPressureStrategy[T any](strategy enums.BackPressureStrategy) Option[T]
WithBackPressureStrategy sets the back pressure strategy: drop or block.
func WithBufferedChannel ¶
WithBufferedChannel allows to configure the capacity of a buffered channel.
func WithCPUPool ¶
WithCPUPool allows to specify an execution pool based on the number of logical CPUs.
func WithContext ¶
WithContext allows to pass a context.
func WithErrorStrategy ¶
func WithErrorStrategy[T any](strategy enums.OnErrorStrategy) Option[T]
WithErrorStrategy defines how an observable should deal with error. This strategy is propagated to the parent observable.
func WithObservationStrategy ¶
func WithObservationStrategy[T any](strategy enums.ObservationStrategy) Option[T]
WithObservationStrategy uses the eager observation mode meaning consuming the items even without subscription.
func WithPublishStrategy ¶
WithPublishStrategy converts an ordinary Observable into a connectable Observable.
type OptionalSingle ¶
type OptionalSingle[T any] interface { Iterable[T] Get(opts ...Option[T]) (Item[T], error) Map(apply Func[T], opts ...Option[T]) OptionalSingle[T] Run(opts ...Option[T]) Disposed }
OptionalSingle is an optional single.
type OptionalSingleImpl ¶
type OptionalSingleImpl[T any] struct { // contains filtered or unexported fields }
OptionalSingleImpl implements OptionalSingle.
func NewOptionalSingleImpl ¶
func NewOptionalSingleImpl[T any](iterable Iterable[T]) OptionalSingleImpl[T]
NewOptionalSingleImpl create OptionalSingleImpl
func (*OptionalSingleImpl[T]) Get ¶
func (o *OptionalSingleImpl[T]) Get(opts ...Option[T]) (Item[T], error)
Get returns the item or rxgo.OptionalEmpty. The error returned is if the context has been cancelled. This method is blocking.
func (*OptionalSingleImpl[T]) Map ¶
func (o *OptionalSingleImpl[T]) Map(apply Func[T], opts ...Option[T]) OptionalSingle[T]
Map transforms the items emitted by an OptionalSingle by applying a function to each item.
func (*OptionalSingleImpl[T]) Observe ¶
func (o *OptionalSingleImpl[T]) Observe(opts ...Option[T]) <-chan Item[T]
Observe observes an OptionalSingle by returning its channel.
func (*OptionalSingleImpl[T]) Run ¶
func (o *OptionalSingleImpl[T]) Run(opts ...Option[T]) Disposed
Run creates an observer without consuming the emitted items.
type ProxyField ¶
type ProxyField[T any, O Numeric] interface { // Field defines the nominated proxy field Field() O // Inc is the incrementor function invoked by the iterator. Implementations // should add the value of by to index. Inc(index *T, by T) *T // Index should return an instance of T that represents the numeric value // of i. Typically, this requires returning an instance of T whose nominated // proxied field is set to i. Index(i int) *T }
ProxyField assists the client when dealing with struct type of T. Used by RangeIteratorPF. Any struct type that is used as the type T must nominate a field (the proxy), that will be used as the iterator field. T represents the parent type and O represents the proxied field type.
type RangeIterator ¶
type RangeIterator[T any] interface { // Init performs pre iteration check and returns an error on failure. Init() error // Start should return the initial index value Start() (*T, error) // Step is used by Increment and defines the size of increment for each iteration Step() T // Increment increments the index value Increment(index *T) T // While defines a condition that must be true for the loop to // continue iterating. While(current T) bool }
RangeIterator allows the client defines how the Range operator emits derived items.
type RangeIteratorByProxy ¶ added in v0.5.2
type RangeIteratorByProxy[T ProxyField[T, O], O Numeric] struct { StartAt T By T Whilst WhilstFunc[T] // contains filtered or unexported fields }
RangeIteratorByProxy iterator required for struct types of T, where the client has nominated a member of T to be the proxy field with which numeric operations are performed to generate indexes for iteration.
func (*RangeIteratorByProxy[T, O]) Increment ¶ added in v0.5.2
func (i *RangeIteratorByProxy[T, O]) Increment(index *T) *T
Increment increments index value
func (*RangeIteratorByProxy[T, O]) Init ¶ added in v0.5.2
func (i *RangeIteratorByProxy[T, O]) Init() error
Init is invoked prior to iteration and returns an error if not defined correctly.
func (*RangeIteratorByProxy[T, O]) Start ¶ added in v0.5.2
func (i *RangeIteratorByProxy[T, O]) Start() (*T, error)
Start should return the initial index value. If By has not been set, a panic occurs
func (*RangeIteratorByProxy[T, O]) Step ¶ added in v0.5.2
func (i *RangeIteratorByProxy[T, O]) Step() O
func (*RangeIteratorByProxy[T, O]) While ¶ added in v0.5.2
func (i *RangeIteratorByProxy[T, O]) While(current T) bool
While defines a condition that must be true for the loop to continue iterating.
type RangeIteratorPF ¶
type RangeIteratorPF[T ProxyField[T, O], O Numeric] interface { // Init performs pre iteration check and returns an error on failure. Init() error // Start should return the initial index value Start() (*T, error) // Step is used by Increment and defines the size of increment for each iteration Step() O // Increment returns a pointer to a new instance of with incremented index value Increment(index *T) *T // While defines a condition that must be true for the loop to // continue iterating. While(current T) bool }
type ShouldRetryFunc ¶
ShouldRetryFunc as used by Retry operator
type Single ¶
type Single[T any] interface { Iterable[T] Filter(apply Predicate[T], opts ...Option[T]) OptionalSingle[T] Get(opts ...Option[T]) (Item[T], error) Map(apply Func[T], opts ...Option[T]) Single[T] Run(opts ...Option[T]) Disposed }
Single is a observable with a single element.
type SingleImpl ¶
type SingleImpl[T any] struct { // contains filtered or unexported fields }
SingleImpl implements Single.
func (*SingleImpl[T]) Filter ¶
func (s *SingleImpl[T]) Filter(apply Predicate[T], opts ...Option[T]) OptionalSingle[T]
Filter emits only those items from an Observable that pass a predicate test.
func (*SingleImpl[T]) Get ¶
func (s *SingleImpl[T]) Get(opts ...Option[T]) (Item[T], error)
Get returns the item. The error returned is if the context has been cancelled. This method is blocking.
func (*SingleImpl[T]) Map ¶
func (s *SingleImpl[T]) Map(apply Func[T], opts ...Option[T]) Single[T]
Map transforms the items emitted by a Single by applying a function to each item.
func (*SingleImpl[T]) Observe ¶
func (s *SingleImpl[T]) Observe(opts ...Option[T]) <-chan Item[T]
Observe observes a Single by returning its channel.
func (*SingleImpl[T]) Run ¶
func (s *SingleImpl[T]) Run(opts ...Option[T]) Disposed
Run creates an observer without consuming the emitted items.
type TimestampItem ¶
TimestampItem attach a timestamp to an item.
type Unmarshaller ¶
Unmarshaller defines an unmarshaller type ([]byte to interface).
type WhilstFunc ¶
WhilstFunc condition function as used by Range
func LessThanPF ¶
func LessThanPF[T ProxyField[T, O], O Numeric](until T) WhilstFunc[T]
func MoreThanPF ¶
func MoreThanPF[T ProxyField[T, O], O Numeric](until T) WhilstFunc[T]
Source Files ¶
- assert.go
- calculator.go
- duration.go
- envelope.go
- errors.go
- factory.go
- item.go
- iterable-channel.go
- iterable-create.go
- iterable-defer.go
- iterable-event-source.go
- iterable-factory.go
- iterable-just.go
- iterable-range.go
- iterable-slice.go
- iterable.go
- limiters.go
- must-try.go
- observable-operator.go
- observable.go
- optional-single.go
- options.go
- single.go
- types.go