Documentation
¶
Index ¶
- func Collect[V any](ctx context.Context, srcObservable observable.Observable[V]) (dstCollection []V)
- func ForEach[V any](ctx context.Context, srcObservable observable.Observable[V], ...)
- func Map[S, D any](ctx context.Context, srcObservable observable.Observable[S], ...) observable.Observable[D]
- func MapExpand[S, D any](ctx context.Context, srcObservable observable.Observable[S], ...) observable.Observable[D]
- func MapReplay[S, D any](ctx context.Context, replayBufferCap int, ...) observable.ReplayObservable[D]
- func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V)
- func NewObserver[V any](ctx context.Context, onUnsubscribe UnsubscribeFunc[V]) *channelObserver[V]
- func NewReplayObservable[V any](ctx context.Context, replayBufferCap int, opts ...option[V]) (observable.ReplayObservable[V], chan<- V)
- func ToReplayObservable[V any](ctx context.Context, replayBufferCap int, srcObsvbl observable.Observable[V]) observable.ReplayObservable[V]
- func WithPublisher[V any](publishCh chan V) option[V]
- type ForEachFn
- type MapFn
- type UnsubscribeFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Collect ¶
func Collect[V any]( ctx context.Context, srcObservable observable.Observable[V], ) (dstCollection []V)
Collect collects all notifications received from the observable and returns them as a slice. ctx MUST be canceled, after some finite duration as it blocks until either srcObservable is closed OR ctx is canceled. Collect is a terminal observable operator.
func ForEach ¶
func ForEach[V any]( ctx context.Context, srcObservable observable.Observable[V], forEachFn ForEachFn[V], )
ForEach applies the given forEachFn to each notification received from the observable, similar to Map; however, ForEach does not publish to a destination observable. ForEach is useful for side effects and is a terminal observable operator.
func Map ¶
func Map[S, D any]( ctx context.Context, srcObservable observable.Observable[S], transformFn MapFn[S, D], ) observable.Observable[D]
Map transforms the given observable by applying the given transformFn to each notification received from the observable. If the transformFn returns a skip bool of true, the notification is skipped and not emitted to the resulting observable.
func MapExpand ¶
func MapExpand[S, D any]( ctx context.Context, srcObservable observable.Observable[S], transformFn MapFn[S, []D], ) observable.Observable[D]
MapExpand transforms the given observable by applying the given transformFn to each notification received from the observable, similar to Map; however, the transformFn returns a slice of output notifications for each input notification.
func MapReplay ¶
func MapReplay[S, D any]( ctx context.Context, replayBufferCap int, srcObservable observable.Observable[S], transformFn MapFn[S, D], ) observable.ReplayObservable[D]
MapReplay transforms the given observable by applying the given transformFn to each notification received from the observable. If the transformFn returns a skip bool of true, the notification is skipped and not emitted to the resulting observable. The resulting observable will receive the last replayBufferCap number of values published to the source observable before receiving new values.
func NewObservable ¶
func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V)
NewObservable creates a new observable which is notified when the publishCh channel receives a value.
func NewObserver ¶
func NewObserver[V any]( ctx context.Context, onUnsubscribe UnsubscribeFunc[V], ) *channelObserver[V]
func NewReplayObservable ¶
func NewReplayObservable[V any]( ctx context.Context, replayBufferCap int, opts ...option[V], ) (observable.ReplayObservable[V], chan<- V)
NewReplayObservable returns a new ReplayObservable with the given replay buffer replayBufferCap and the corresponding publish channel to notify it of new values.
func ToReplayObservable ¶
func ToReplayObservable[V any]( ctx context.Context, replayBufferCap int, srcObsvbl observable.Observable[V], ) observable.ReplayObservable[V]
ToReplayObservable returns an observable which replays the last replayBufferCap number of values published to the source observable to new observers, before publishing new values. It should only be used with a srcObservable which contains channelObservers (i.e. channelObservable or similar).
func WithPublisher ¶
func WithPublisher[V any](publishCh chan V) option[V]
WithPublisher returns an option function which sets the given publishCh of the resulting observable when passed to NewObservable().
Types ¶
type UnsubscribeFunc ¶
type UnsubscribeFunc[V any] func(toRemove observable.Observer[V])