channel

package
v0.0.9-3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Collect

func Collect[V any](
	ctx context.Context,
	srcObservable observable.Observable[V],
) (dstCollection []V)

Collect collects all notifications received from the observable and returns them as a slice. ctx MUST be canceled, after some finite duration as it blocks until either srcObservable is closed OR ctx is canceled. Collect is a terminal observable operator.

func ForEach

func ForEach[V any](
	ctx context.Context,
	srcObservable observable.Observable[V],
	forEachFn ForEachFn[V],
)

ForEach applies the given forEachFn to each notification received from the observable, similar to Map; however, ForEach does not publish to a destination observable. ForEach is useful for side effects and is a terminal observable operator.

func Map

func Map[S, D any](
	ctx context.Context,
	srcObservable observable.Observable[S],
	transformFn MapFn[S, D],
) observable.Observable[D]

Map transforms the given observable by applying the given transformFn to each notification received from the observable. If the transformFn returns a skip bool of true, the notification is skipped and not emitted to the resulting observable.

func MapExpand

func MapExpand[S, D any](
	ctx context.Context,
	srcObservable observable.Observable[S],
	transformFn MapFn[S, []D],
) observable.Observable[D]

MapExpand transforms the given observable by applying the given transformFn to each notification received from the observable, similar to Map; however, the transformFn returns a slice of output notifications for each input notification.

func MapReplay

func MapReplay[S, D any](
	ctx context.Context,
	replayBufferCap int,
	srcObservable observable.Observable[S],
	transformFn MapFn[S, D],
) observable.ReplayObservable[D]

MapReplay transforms the given observable by applying the given transformFn to each notification received from the observable. If the transformFn returns a skip bool of true, the notification is skipped and not emitted to the resulting observable. The resulting observable will receive the last replayBufferCap number of values published to the source observable before receiving new values.

func NewObservable

func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V)

NewObservable creates a new observable which is notified when the publishCh channel receives a value.

func NewObserver

func NewObserver[V any](
	ctx context.Context,
	onUnsubscribe UnsubscribeFunc[V],
) *channelObserver[V]

func NewReplayObservable

func NewReplayObservable[V any](
	ctx context.Context,
	replayBufferCap int,
	opts ...option[V],
) (observable.ReplayObservable[V], chan<- V)

NewReplayObservable returns a new ReplayObservable with the given replay buffer replayBufferCap and the corresponding publish channel to notify it of new values.

func ToReplayObservable

func ToReplayObservable[V any](
	ctx context.Context,
	replayBufferCap int,
	srcObsvbl observable.Observable[V],
) observable.ReplayObservable[V]

ToReplayObservable returns an observable which replays the last replayBufferCap number of values published to the source observable to new observers, before publishing new values. It should only be used with a srcObservable which contains channelObservers (i.e. channelObservable or similar).

func WithPublisher

func WithPublisher[V any](publishCh chan V) option[V]

WithPublisher returns an option function which sets the given publishCh of the resulting observable when passed to NewObservable().

Types

type ForEachFn

type ForEachFn[V any] func(ctx context.Context, src V)

type MapFn

type MapFn[S, D any] func(ctx context.Context, src S) (dst D, skip bool)

type UnsubscribeFunc

type UnsubscribeFunc[V any] func(toRemove observable.Observer[V])

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL