rx

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: MIT Imports: 8 Imported by: 1

README

rx

Go Reference Build Status Coverage Status

A reactive programming library for Go, inspired by https://reactivex.io/ (mostly RxJS).

Documentation

Overview

Package rx is a reactive programming library for Go, inspired by https://reactivex.io/ (mostly RxJS).

Observers

type Observer[T any] func(n Notification[T])

An Observer is a consumer of Notifications delivered by an Observable.

An Observer is usually created and passed to Observable.Subscribe method when subscribing to an Observable.

Observables

type Observable[T any] func(c Context, o Observer[T])

An Observable is a collection of future values, waiting to become a flow of data. Subscribing an Observer to an Observable makes it happen. When an Observable is subscribed, its values, when available, are emitted to a given Observer.

An Observable can only emit N+1 Notifications (N >= 0): either N values and an error, or N values and a completion. The last Notification emitted by an Observable must be an error or a completion.

Operators

type Operator[T, R any] interface {
	Apply(source Observable[T]) Observable[R]
}

An Operator is an operation on an Observable. When applied, they do not change the existing Observable value. Instead, they return a new one, whose subscription logic is based on the first Observable.

There are many kinds of Operators in this library. Here is a list of what Operators can do:

Previously, Operator was also a function type like Observable and Observer. It was changed to be an interface type for one reason: implementations can carry additional methods for setting extra options. For example, MergeMap has two extra options: MergeMapOperator.WithBuffering and MergeMapOperator.WithConcurrency, and this is how they are specified when using a MergeMap: MergeMap(f).WithBuffering().WithConcurrency(3).

Chaining Multiple Operators

To chain multiple Operators, do either this:

ob1 := op1.Apply(source)
ob2 := op2.Apply(ob1)
ob3 := op3.Apply(ob2)

or this:

ob := Pipe3(source, op1, op2, op3)

There are 9 Pipe functions in this library, from Pipe1 to Pipe9. For different number of Operators, use different Pipe function.

When there are really too many Operators to chain, do either this:

ob1 := Pipe5(source, op1, op2, op3, op4, op5)
ob2 := Pipe5(ob1, op6, op7, op8, op9, op10)
ob3 := Pipe5(ob2, op11, op12, op13, op14, op15)

or this:

ob := Pipe3(source,
	Compose5(op1, op2, op3, op4, op5),
	Compose5(op6, op7, op8, op9, op10),
	Compose5(op11, op12, op13, op14, op15),
)

There are 8 Compose functions in this library, from Compose2 to Compose9.

Concurrency Safety

Notifications emitted by an Observable may come from any started goroutine, but they are guaranteed to be in sequence, one after another.

Operators in a chain may run in different goroutines. In the following code:

Pipe3(ob, op1, op2, op3).Subscribe(c, o)

Race conditions could happen for any two of ob, op1, op2, op3 and o.

Race conditions could also happen for any two Observables, however, not every Operator or Observable has concurrency behavior.

It's very common that an Observable, when subscribed, also subscribes to other Observables. In this library, inner Observables are usually subscribed in the same goroutine where the outer one is being subscribed. However,

  • Observables returned by Go always subscribe to their source Observable in a separate goroutine;
  • Observables returned by Merge(All|Map|MapTo), with source buffering on, may or may not subscribe to inner Observables in separate goroutines;
  • Observables returned by Zip[2-9] always subscribe to input Observables in separate goroutines (this one might change in the future).

When in doubt, read the code.

Example
package main

import (
	"context"
	"fmt"

	"github.com/b97tsk/rx"
)

func main() {
	// Create an Observable...
	ob := rx.Range(1, 10)

	// ...and apply some Operators.
	ob = rx.Pipe3(
		ob,
		rx.Filter(
			func(v int) bool {
				return v%2 == 1
			},
		),
		rx.Map(
			func(v int) int {
				return v * 2
			},
		),
		rx.Do(
			func(n rx.Notification[int]) {
				switch n.Kind {
				case rx.KindNext:
					fmt.Println(n.Value)
				case rx.KindError:
					fmt.Println(n.Error)
				case rx.KindComplete:
					fmt.Println("Complete")
				}
			},
		),
	)

	// To Subscribe to an Observable, you call its Subscribe method, which takes
	// a Context and an Observer as arguments.
	ob.Subscribe(rx.NewContext(context.TODO()), rx.Noop[int])

	// Since this example has no other goroutines involved, it must have already done.
	// You can also use BlockingSubscribe method instead. It blocks until done.

}
Output:

2
6
10
14
18
Complete
Example (Blocking)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/b97tsk/rx"
)

func main() {
	ctx, cancel := rx.NewContext(context.TODO()).WithTimeout(700 * time.Millisecond)
	defer cancel()

	ob := rx.Pipe4(
		rx.Concat(
			rx.Timer(50*time.Millisecond),
			rx.Ticker(100*time.Millisecond),
		),
		rx.Scan(-1, func(i int, _ time.Time) int { return i + 1 }), // 0, 1, 2, 3, ...
		rx.Scan(0, func(v1, v2 int) int { return v1 + v2 }),        // 0, 1, 3, 6, ...
		rx.Skip[int](4),
		rx.DoOnNext(func(v int) { fmt.Println(v) }), // 10, 15, 21, ...
	)

	err := ob.BlockingSubscribe(ctx, rx.Noop[int])
	if err != nil {
		fmt.Println(err)
	}

}
Output:

10
15
21
context deadline exceeded
Example (Unicast)

Multicasts and Unicasts are special Observables that developers can decide what values they produce or when they complete, later after they are subscribed. Multicasts can be subscribed multiple times, whereas Unicasts can only be successfully subscribed once. Both Multicasts and Unicasts are safe for concurrent use. Here is an example demonstrates how to use a Unicast.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/b97tsk/rx"
)

func main() {
	ctx := rx.NewContext(context.TODO()).WithNewWaitGroup()

	u := rx.Unicast[time.Time]()

	ctx.Go(func() {
		rx.Pipe2(
			rx.Concat(
				u.Observable,
				rx.Timer(500*time.Millisecond),
			),
			rx.MapTo[time.Time](42),
			rx.Do(
				func(n rx.Notification[int]) {
					switch n.Kind {
					case rx.KindNext:
						fmt.Println(n.Value)
					case rx.KindError:
						fmt.Println(n.Error)
					case rx.KindComplete:
						fmt.Println("Complete")
					}
				},
			),
		).Subscribe(ctx, rx.Noop[int])
	})

	ctx.Go(func() {
		u.Complete() // Start timer.
	})

	ctx.Wait()

	// This example works properly, no matter which ctx.Go(...) runs first.

}
Output:

42
Complete
Example (WaitGroup)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/b97tsk/rx"
)

func main() {
	ctx := rx.NewContext(context.TODO()).WithNewWaitGroup()

	ctx.Go(func() {
		for n := 1; n < 4; n++ {
			rx.Pipe2(
				rx.Timer(50*time.Millisecond*time.Duration(n)),
				rx.MapTo[time.Time](n),
				rx.DoOnNext(func(v int) { fmt.Println(v) }),
			).Subscribe(ctx, rx.Noop[int])
		}
	})

	ctx.Wait()

}
Output:

1
2
3

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrBufferOverflow = errors.New("buffer overflow")
	ErrEmpty          = errors.New("empty")
	ErrFinalized      = errors.New("finalized")
	ErrNotSingle      = errors.New("not single")
	ErrOops           = errors.New("oops")
	ErrTimeout        = errors.New("timeout")
	ErrUnicast        = errors.New("unicast")
)

Functions

func Noop

func Noop[T any](Notification[T])

Noop gives you an Observer that does nothing.

func Serialize added in v0.9.0

func Serialize[T any](c Context, o Observer[T]) (Context, Observer[T])

Serialize returns an Observer that passes incoming emissions to o in a mutually exclusive way. Serialize also returns a copy of c that will be cancelled when o is about to receive a notification of error or completion.

func Try0 added in v0.9.0

func Try0(f, oops func())

Try0 runs f(). If f does not return normally, runs oops().

func Try01 added in v0.9.0

func Try01[R any](f func() R, oops func()) R

Try01 returns f(). If f does not return normally, runs oops().

func Try1 added in v0.9.0

func Try1[T1 any](f func(v1 T1), v1 T1, oops func())

Try1 runs f(v1). If f does not return normally, runs oops().

func Try11 added in v0.9.0

func Try11[T1, R any](f func(v1 T1) R, v1 T1, oops func()) R

Try11 returns f(v1). If f does not return normally, runs oops().

func Try2 added in v0.9.0

func Try2[T1, T2 any](f func(v1 T1, v2 T2), v1 T1, v2 T2, oops func())

Try2 runs f(v1, v2). If f does not return normally, runs oops().

func Try21 added in v0.9.0

func Try21[T1, T2, R any](f func(v1 T1, v2 T2) R, v1 T1, v2 T2, oops func()) R

Try21 returns f(v1, v2). If f does not return normally, runs oops().

func Try3 added in v0.9.0

func Try3[T1, T2, T3 any](f func(v1 T1, v2 T2, v3 T3), v1 T1, v2 T2, v3 T3, oops func())

Try3 runs f(v1, v2, v3). If f does not return normally, runs oops().

func Try31 added in v0.9.0

func Try31[T1, T2, T3, R any](f func(v1 T1, v2 T2, v3 T3) R, v1 T1, v2 T2, v3 T3, oops func()) R

Try31 returns f(v1, v2, v3). If f does not return normally, runs oops().

func Try4 added in v0.9.0

func Try4[T1, T2, T3, T4 any](f func(v1 T1, v2 T2, v3 T3, v4 T4), v1 T1, v2 T2, v3 T3, v4 T4, oops func())

Try4 runs f(v1, v2, v3, v4). If f does not return normally, runs oops().

func Try41 added in v0.9.0

func Try41[T1, T2, T3, T4, R any](f func(v1 T1, v2 T2, v3 T3, v4 T4) R, v1 T1, v2 T2, v3 T3, v4 T4, oops func()) R

Try41 returns f(v1, v2, v3, v4). If f does not return normally, runs oops().

func Try5 added in v0.9.0

func Try5[T1, T2, T3, T4, T5 any](
	f func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5),
	v1 T1, v2 T2, v3 T3, v4 T4, v5 T5,
	oops func(),
)

Try5 runs f(v1, v2, v3, v4, v5). If f does not return normally, runs oops().

func Try51 added in v0.9.0

func Try51[T1, T2, T3, T4, T5, R any](
	f func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5) R,
	v1 T1, v2 T2, v3 T3, v4 T4, v5 T5,
	oops func(),
) R

Try51 returns f(v1, v2, v3, v4, v5). If f does not return normally, runs oops().

func Try6 added in v0.9.0

func Try6[T1, T2, T3, T4, T5, T6 any](
	f func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6),
	v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6,
	oops func(),
)

Try6 runs f(v1, v2, v3, v4, v5, v6). If f does not return normally, runs oops().

func Try61 added in v0.9.0

func Try61[T1, T2, T3, T4, T5, T6, R any](
	f func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6) R,
	v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6,
	oops func(),
) R

Try61 returns f(v1, v2, v3, v4, v5, v6). If f does not return normally, runs oops().

func Try7 added in v0.9.0

func Try7[T1, T2, T3, T4, T5, T6, T7 any](
	f func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7),
	v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7,
	oops func(),
)

Try7 runs f(v1, v2, v3, v4, v5, v6, v7). If f does not return normally, runs oops().

func Try71 added in v0.9.0

func Try71[T1, T2, T3, T4, T5, T6, T7, R any](
	f func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7) R,
	v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7,
	oops func(),
) R

Try71 returns f(v1, v2, v3, v4, v5, v6, v7). If f does not return normally, runs oops().

func Try8 added in v0.9.0

func Try8[T1, T2, T3, T4, T5, T6, T7, T8 any](
	f func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8),
	v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8,
	oops func(),
)

Try8 runs f(v1, v2, v3, v4, v5, v6, v7, v8). If f does not return normally, runs oops().

func Try81 added in v0.9.0

func Try81[T1, T2, T3, T4, T5, T6, T7, T8, R any](
	f func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8) R,
	v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8,
	oops func(),
) R

Try81 returns f(v1, v2, v3, v4, v5, v6, v7, v8). If f does not return normally, runs oops().

func Try9 added in v0.9.0

func Try9[T1, T2, T3, T4, T5, T6, T7, T8, T9 any](
	f func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9),
	v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9,
	oops func(),
)

Try9 runs f(v1, v2, v3, v4, v5, v6, v7, v8, v9). If f does not return normally, runs oops().

func Try91 added in v0.9.0

func Try91[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any](
	f func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9) R,
	v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9,
	oops func(),
) R

Try91 returns f(v1, v2, v3, v4, v5, v6, v7, v8, v9). If f does not return normally, runs oops().

Types

type BufferCountOperator added in v0.7.0

type BufferCountOperator[T any] struct {
	// contains filtered or unexported fields
}

BufferCountOperator is an Operator type for BufferCount.

func BufferCount added in v0.7.0

func BufferCount[T any](bufferSize int) BufferCountOperator[T]

BufferCount buffers a number of values from the source Observable as a slice, and emits that slice when its size reaches given BufferSize; then, BufferCount starts a new buffer by dropping a number of most dated values specified by StartBufferEvery option (defaults to BufferSize).

For reducing allocations, slices emitted by the output Observable share a same underlying array.

func (BufferCountOperator[T]) Apply added in v0.7.0

func (op BufferCountOperator[T]) Apply(source Observable[T]) Observable[[]T]

Apply implements the Operator interface.

func (BufferCountOperator[T]) WithStartBufferEvery added in v0.7.0

func (op BufferCountOperator[T]) WithStartBufferEvery(n int) BufferCountOperator[T]

WithStartBufferEvery sets StartBufferEvery option to a given value.

type CancelCauseFunc added in v0.9.0

type CancelCauseFunc = context.CancelCauseFunc

A CancelCauseFunc behaves like a CancelFunc but additionally sets the cancellation cause. This cause can be retrieved by calling Context.Cause on the canceled Context or on any of its derived Contexts.

type CancelFunc added in v0.9.0

type CancelFunc = context.CancelFunc

A CancelFunc tells an operation to abandon its work. A CancelFunc does not wait for the work to stop. A CancelFunc may be called by multiple goroutines simultaneously. After the first call, subsequent calls to a CancelFunc do nothing.

type ConcatMapOperator added in v0.9.0

type ConcatMapOperator[T, R any] struct {
	// contains filtered or unexported fields
}

ConcatMapOperator is an Operator type for ConcatMap.

func ConcatAll added in v0.7.0

func ConcatAll[_ Observable[T], T any]() ConcatMapOperator[Observable[T], T]

ConcatAll flattens a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.

func ConcatMap added in v0.7.0

func ConcatMap[T, R any](mapping func(v T) Observable[R]) ConcatMapOperator[T, R]

ConcatMap converts the source Observable into a higher-order Observable, by mapping each source value to an Observable, then flattens it into a first-order Observable using ConcatAll.

func ConcatMapTo added in v0.7.0

func ConcatMapTo[T, R any](inner Observable[R]) ConcatMapOperator[T, R]

ConcatMapTo converts the source Observable into a higher-order Observable, by mapping each source value to the same Observable, then flattens it into a first-order Observable using ConcatAll.

func (ConcatMapOperator[T, R]) Apply added in v0.9.0

func (op ConcatMapOperator[T, R]) Apply(source Observable[T]) Observable[R]

Apply implements the Operator interface.

func (ConcatMapOperator[T, R]) WithBuffering added in v0.9.0

func (op ConcatMapOperator[T, R]) WithBuffering() ConcatMapOperator[T, R]

WithBuffering turns on source buffering. By default, this Operator blocks at every source value until it's flattened. With source buffering on, this Operator buffers every source value, which might consume a lot of memory over time if the source has lots of values emitting faster than concatenating.

type ConnectOperator added in v0.7.0

type ConnectOperator[T, R any] struct {
	// contains filtered or unexported fields
}

ConnectOperator is an Operator type for Connect.

func Connect added in v0.7.0

func Connect[T, R any](selector func(source Observable[T]) Observable[R]) ConnectOperator[T, R]

Connect multicasts the source Observable within a function where multiple subscriptions can share the same source.

func (ConnectOperator[T, R]) Apply added in v0.7.0

func (op ConnectOperator[T, R]) Apply(source Observable[T]) Observable[R]

Apply implements the Operator interface.

func (ConnectOperator[T, R]) WithConnector added in v0.7.0

func (op ConnectOperator[T, R]) WithConnector(connector func() Subject[T]) ConnectOperator[T, R]

WithConnector sets Connector option to a given value.

type Context added in v0.9.0

type Context struct {
	Context      context.Context
	WaitGroup    *sync.WaitGroup
	PanicHandler func(v any)
}

A Context carries a context.Context, an optional sync.WaitGroup, and an optional panic handler.

func NewBackgroundContext added in v0.9.0

func NewBackgroundContext() Context

NewBackgroundContext returns NewContext(context.Background()).

func NewContext added in v0.9.0

func NewContext(ctx context.Context) Context

NewContext returns a Context with Context field set to ctx.

func (Context) AfterFunc added in v0.9.0

func (c Context) AfterFunc(f func()) (stop func() bool)

AfterFunc arranges to call f in its own goroutine after c is done (cancelled or timed out). If c is already done, AfterFunc calls f immediately in its own goroutine.

Calling the returned stop function stops the association of c with f. It returns true if the call stopped f from being run. If stop returns false, either the context is done and f has been started in its own goroutine; or f was already stopped. The stop function does not wait for f to complete before returning.

Internally, f is wrapped with Context.PreAsyncCall before being passed to context.AfterFunc.

func (Context) Cause added in v0.9.0

func (c Context) Cause() error

Cause returns a non-nil error explaining why c was canceled. The first cancellation of c or one of its parents sets the cause. If that cancellation happened via a call to CancelCauseFunc(err), then Cause returns err. Otherwise c.Cause() returns the same value as c.Err(). Cause returns nil if c has not been canceled yet.

func (Context) Done added in v0.9.0

func (c Context) Done() <-chan struct{}

Done returns c.Context.Done().

func (Context) Go added in v0.9.0

func (c Context) Go(f func())

Go calls f in a goroutine.

Internally, f is wrapped with Context.PreAsyncCall before being run by the built-in go statement.

func (Context) PreAsyncCall added in v0.9.0

func (c Context) PreAsyncCall(f func()) func()

PreAsyncCall increases c.WaitGroup's counter, if c.WaitGroup is not nil, and returns a function that calls f.

If c.WaitGroup is not nil, the function returned decreases c.WaitGroup's counter when f returns.

If f panics and c.PanicHandler is not nil, the function returned calls c.PanicHandler with a value returned by the built-in recover function.

PreAsyncCall is usually called before starting an asynchronous operation, the caller then calls the function returned in that asynchronous operation. The function passed to PreAsyncCall is what the caller would do in that asynchronous operation.

func (Context) Wait added in v0.9.0

func (c Context) Wait()

Wait runs c.WaitGroup.Wait(). If c.WaitGroup is not set, Wait panics.

func (Context) WithCancel added in v0.9.0

func (c Context) WithCancel() (Context, CancelFunc)

WithCancel returns a copy of c with a new Done channel. The returned context's Done channel is closed when the returned cancel function is called or when c's Done channel is closed, whichever happens first.

Canceling this context releases resources associated with it, so code should call the returned CancelFunc as soon as the operations running in this context complete.

func (Context) WithCancelCause added in v0.9.0

func (c Context) WithCancelCause() (Context, CancelCauseFunc)

WithCancelCause behaves like Context.WithCancel but returns a CancelCauseFunc instead of a CancelFunc. Calling the returned CancelCauseFunc with a non-nil error (the "cause") records that error in the returned Context; it can then be retrieved using Context.Cause. Calling the returned CancelCauseFunc with nil sets the cause to context.Canceled.

func (Context) WithDeadline added in v0.9.0

func (c Context) WithDeadline(d time.Time) (Context, CancelFunc)

WithDeadline returns a copy of c with the deadline adjusted to be no later than d. If c's deadline is already earlier than d, c.WithDeadline(d) is semantically equivalent to c. The returned context's Done channel is closed when the deadline expires, when the returned cancel function is called, or when c's Done channel is closed, whichever happens first.

Canceling this context releases resources associated with it, so code should call the returned CancelFunc as soon as the operations running in this context complete.

func (Context) WithDeadlineCause added in v0.9.0

func (c Context) WithDeadlineCause(d time.Time, cause error) (Context, CancelFunc)

WithDeadlineCause behaves like Context.WithDeadline but also sets the cause of the returned Context when the deadline is exceeded. The returned CancelFunc does not set the cause.

func (Context) WithNewWaitGroup added in v0.9.0

func (c Context) WithNewWaitGroup() Context

WithNewWaitGroup returns a copy of c with WaitGroup field set to a new sync.WaitGroup.

func (Context) WithPanicHandler added in v0.9.0

func (c Context) WithPanicHandler(f func(v any)) Context

WithPanicHandler returns a copy of c with PanicHandler field set to f.

func (Context) WithTimeout added in v0.9.0

func (c Context) WithTimeout(timeout time.Duration) (Context, CancelFunc)

WithTimeout returns c.WithDeadline(time.Now().Add(timeout)).

Canceling this context releases resources associated with it, so code should call the returned CancelFunc as soon as the operations running in this context complete.

func (Context) WithTimeoutCause added in v0.9.0

func (c Context) WithTimeoutCause(timeout time.Duration, cause error) (Context, CancelFunc)

WithTimeoutCause behaves like Context.WithTimeout but also sets the cause of the returned Context when the timeout expires. The returned CancelFunc does not set the cause.

func (Context) WithWaitGroup added in v0.9.0

func (c Context) WithWaitGroup(wg *sync.WaitGroup) Context

WithWaitGroup returns a copy of c with WaitGroup field set to wg.

type MergeMapOperator added in v0.7.0

type MergeMapOperator[T, R any] struct {
	// contains filtered or unexported fields
}

MergeMapOperator is an Operator type for MergeMap.

func MergeAll added in v0.7.0

func MergeAll[_ Observable[T], T any]() MergeMapOperator[Observable[T], T]

MergeAll flattens a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.

func MergeMap added in v0.7.0

func MergeMap[T, R any](mapping func(v T) Observable[R]) MergeMapOperator[T, R]

MergeMap converts the source Observable into a higher-order Observable, by mapping each source value to an Observable, then flattens it into a first-order Observable using MergeAll.

func MergeMapTo added in v0.7.0

func MergeMapTo[T, R any](inner Observable[R]) MergeMapOperator[T, R]

MergeMapTo converts the source Observable into a higher-order Observable, by mapping each source value to the same Observable, then flattens it into a first-order Observable using MergeAll.

func (MergeMapOperator[T, R]) Apply added in v0.7.0

func (op MergeMapOperator[T, R]) Apply(source Observable[T]) Observable[R]

Apply implements the Operator interface.

func (MergeMapOperator[T, R]) WithBuffering added in v0.9.0

func (op MergeMapOperator[T, R]) WithBuffering() MergeMapOperator[T, R]

WithBuffering turns on source buffering. By default, this Operator might block the source due to concurrency limit. With source buffering on, this Operator buffers every source value, which might consume a lot of memory over time if the source has lots of values emitting faster than merging.

func (MergeMapOperator[T, R]) WithConcurrency added in v0.7.0

func (op MergeMapOperator[T, R]) WithConcurrency(n int) MergeMapOperator[T, R]

WithConcurrency sets Concurrency option to a given value. It must not be zero. The default value is -1 (unlimited).

type Notification

type Notification[T any] struct {
	Kind  NotificationKind
	Value T
	Error error
}

Notification is the representation of an emission.

There is three kinds of Notifications: values, errors and completions.

An Observable can only emit N+1 Notifications: either N values and an error, or N values and a completion. The last Notification emitted by an Observable must be an error or a completion.

func Complete

func Complete[T any]() Notification[T]

Complete creates a Notification that represents a completion.

func Error added in v0.7.0

func Error[T any](err error) Notification[T]

Error creates a Notification that represents an error.

func Next added in v0.7.0

func Next[T any](v T) Notification[T]

Next creates a Notification that represents a value.

type NotificationKind added in v0.9.0

type NotificationKind int8

NotificationKind is the type of an emission.

const (
	KindNext NotificationKind
	KindError
	KindComplete
)

type Observable

type Observable[T any] func(c Context, o Observer[T])

An Observable is a collection of future values. When an Observable is subscribed, its values, when available, are emitted to a given Observer.

To cancel a subscription to an Observable, cancel the given Context.

An Observable must honor the Observable protocol:

  • An Observable can emit zero or more values of specific type;
  • An Observable must emit a notification of error or completion as a termination;
  • An Observable must not emit anything after a termination.

Observables are expected to be sequential. Every emission emitted to the given Observer must be concurrency safe.

An Observable must honor the cancellation of the given Context. When the cancellation of the given Context is detected, an Observable must emit an error notification of whatever Context.Cause returns, as a termination, to the given Observer as soon as possible.

If an Observable needs to start goroutines, it must use Context.Go to do so; if an Observable needs to start an asynchronous operation other than goroutines, it must call Context.PreAsyncCall to wrap what that Observable would do in that asynchronous operation, then call the function returned in that asynchronous operation instead.

To achieve something in parallel, multiple Observables might be involved. There are a couple of functions and Operators in this library can handle multiple Observables, most of them can do things concurrently.

func CombineLatest2 added in v0.7.0

func CombineLatest2[T1, T2, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	mapping func(v1 T1, v2 T2) R,
) Observable[R]

CombineLatest2 combines multiple Observables to create an Observable that emits mappings of the latest values emitted by each of its input Observables.

func CombineLatest3 added in v0.7.0

func CombineLatest3[T1, T2, T3, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	mapping func(v1 T1, v2 T2, v3 T3) R,
) Observable[R]

CombineLatest3 combines multiple Observables to create an Observable that emits mappings of the latest values emitted by each of its input Observables.

func CombineLatest4 added in v0.7.0

func CombineLatest4[T1, T2, T3, T4, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4) R,
) Observable[R]

CombineLatest4 combines multiple Observables to create an Observable that emits mappings of the latest values emitted by each of its input Observables.

func CombineLatest5 added in v0.7.0

func CombineLatest5[T1, T2, T3, T4, T5, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5) R,
) Observable[R]

CombineLatest5 combines multiple Observables to create an Observable that emits mappings of the latest values emitted by each of its input Observables.

func CombineLatest6 added in v0.7.0

func CombineLatest6[T1, T2, T3, T4, T5, T6, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6) R,
) Observable[R]

CombineLatest6 combines multiple Observables to create an Observable that emits mappings of the latest values emitted by each of its input Observables.

func CombineLatest7 added in v0.7.0

func CombineLatest7[T1, T2, T3, T4, T5, T6, T7, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	ob7 Observable[T7],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7) R,
) Observable[R]

CombineLatest7 combines multiple Observables to create an Observable that emits mappings of the latest values emitted by each of its input Observables.

func CombineLatest8 added in v0.7.0

func CombineLatest8[T1, T2, T3, T4, T5, T6, T7, T8, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	ob7 Observable[T7],
	ob8 Observable[T8],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8) R,
) Observable[R]

CombineLatest8 combines multiple Observables to create an Observable that emits mappings of the latest values emitted by each of its input Observables.

func CombineLatest9 added in v0.7.0

func CombineLatest9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	ob7 Observable[T7],
	ob8 Observable[T8],
	ob9 Observable[T9],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9) R,
) Observable[R]

CombineLatest9 combines multiple Observables to create an Observable that emits mappings of the latest values emitted by each of its input Observables.

func Concat

func Concat[T any](some ...Observable[T]) Observable[T]

Concat creates an Observable that concatenates multiple Observables together by sequentially emitting their values, one Observable after the other.

func Empty

func Empty[T any]() Observable[T]

Empty returns an Observable that emits no values and immediately completes.

func FromMap added in v0.7.0

func FromMap[M ~map[K]V, K comparable, V any](m M) Observable[Pair[K, V]]

FromMap creates an Observable that emits Pairs from a map, one after the other, and then completes. The order of those Pairs is not specified.

func FromSlice

func FromSlice[S ~[]T, T any](s S) Observable[T]

FromSlice creates an Observable that emits values from a slice, one after the other, and then completes.

func Iota added in v0.6.0

func Iota[T constraints.Integer](init T) Observable[T]

Iota creates an Observable that emits an infinite sequence of integers starting from init.

func Just

func Just[T any](s ...T) Observable[T]

Just creates an Observable that emits some values you specify as arguments, one after the other, and then completes.

func Merge

func Merge[T any](some ...Observable[T]) Observable[T]

Merge creates an Observable that concurrently emits all values from every given input Observable.

func Never

func Never[T any]() Observable[T]

Never returns an Observable that never emits anything, except when a context cancellation is detected, emits an error notification of whatever Context.Cause returns.

func NewObservable added in v0.8.0

func NewObservable[T any](f func(c Context, o Observer[T])) Observable[T]

NewObservable creates an Observable from f.

func Oops added in v0.9.0

func Oops[T any](v any) Observable[T]

Oops creates an Observable that emits no values and immediately emits an error notification of ErrOops, after calling panic(v).

func Pipe1 added in v0.8.0

func Pipe1[A, B any](
	ob Observable[A],
	op1 Operator[A, B],
) Observable[B]

Pipe1 applies an Operator to an Observable and returns the resulting Observable.

func Pipe2 added in v0.7.0

func Pipe2[A, B, C any](
	ob Observable[A],
	op1 Operator[A, B],
	op2 Operator[B, C],
) Observable[C]

Pipe2 applies 2 Operators to an Observable and returns the resulting Observable.

func Pipe3 added in v0.7.0

func Pipe3[A, B, C, D any](
	ob Observable[A],
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
) Observable[D]

Pipe3 applies 3 Operators to an Observable and returns the resulting Observable.

func Pipe4 added in v0.7.0

func Pipe4[A, B, C, D, E any](
	ob Observable[A],
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
	op4 Operator[D, E],
) Observable[E]

Pipe4 applies 4 Operators to an Observable and returns the resulting Observable.

func Pipe5 added in v0.7.0

func Pipe5[A, B, C, D, E, F any](
	ob Observable[A],
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
	op4 Operator[D, E],
	op5 Operator[E, F],
) Observable[F]

Pipe5 applies 5 Operators to an Observable and returns the resulting Observable.

func Pipe6 added in v0.7.0

func Pipe6[A, B, C, D, E, F, G any](
	ob Observable[A],
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
	op4 Operator[D, E],
	op5 Operator[E, F],
	op6 Operator[F, G],
) Observable[G]

Pipe6 applies 6 Operators to an Observable and returns the resulting Observable.

func Pipe7 added in v0.7.0

func Pipe7[A, B, C, D, E, F, G, H any](
	ob Observable[A],
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
	op4 Operator[D, E],
	op5 Operator[E, F],
	op6 Operator[F, G],
	op7 Operator[G, H],
) Observable[H]

Pipe7 applies 7 Operators to an Observable and returns the resulting Observable.

func Pipe8 added in v0.7.0

func Pipe8[A, B, C, D, E, F, G, H, I any](
	ob Observable[A],
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
	op4 Operator[D, E],
	op5 Operator[E, F],
	op6 Operator[F, G],
	op7 Operator[G, H],
	op8 Operator[H, I],
) Observable[I]

Pipe8 applies 8 Operators to an Observable and returns the resulting Observable.

func Pipe9 added in v0.7.0

func Pipe9[A, B, C, D, E, F, G, H, I, J any](
	ob Observable[A],
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
	op4 Operator[D, E],
	op5 Operator[E, F],
	op6 Operator[F, G],
	op7 Operator[G, H],
	op8 Operator[H, I],
	op9 Operator[I, J],
) Observable[J]

Pipe9 applies 9 Operators to an Observable and returns the resulting Observable.

func Race

func Race[T any](some ...Observable[T]) Observable[T]

Race creates an Observable that mirrors the first Observable to emit a value, from given input Observables.

func Range

func Range[T constraints.Integer](low, high T) Observable[T]

Range creates an Observable that emits a sequence of integers within a specified range.

func Throw

func Throw[T any](err error) Observable[T]

Throw creates an Observable that emits no values and immediately emits an error notification of err.

func Ticker added in v0.1.0

func Ticker(d time.Duration) Observable[time.Time]

Ticker creates an Observable that emits time.Time values every specified interval of time.

func Timer

func Timer(d time.Duration) Observable[time.Time]

Timer creates an Observable that emits a time.Time value after a particular time span has passed, and then completes.

func Zip2 added in v0.7.0

func Zip2[T1, T2, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	mapping func(v1 T1, v2 T2) R,
) Observable[R]

Zip2 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

Zip2 pulls values from each input Observable one by one, it does not buffer any value.

func Zip3 added in v0.7.0

func Zip3[T1, T2, T3, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	mapping func(v1 T1, v2 T2, v3 T3) R,
) Observable[R]

Zip3 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

Zip3 pulls values from each input Observable one by one, it does not buffer any value.

func Zip4 added in v0.7.0

func Zip4[T1, T2, T3, T4, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4) R,
) Observable[R]

Zip4 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

Zip4 pulls values from each input Observable one by one, it does not buffer any value.

func Zip5 added in v0.7.0

func Zip5[T1, T2, T3, T4, T5, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5) R,
) Observable[R]

Zip5 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

Zip5 pulls values from each input Observable one by one, it does not buffer any value.

func Zip6 added in v0.7.0

func Zip6[T1, T2, T3, T4, T5, T6, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6) R,
) Observable[R]

Zip6 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

Zip6 pulls values from each input Observable one by one, it does not buffer any value.

func Zip7 added in v0.7.0

func Zip7[T1, T2, T3, T4, T5, T6, T7, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	ob7 Observable[T7],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7) R,
) Observable[R]

Zip7 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

Zip7 pulls values from each input Observable one by one, it does not buffer any value.

func Zip8 added in v0.7.0

func Zip8[T1, T2, T3, T4, T5, T6, T7, T8, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	ob7 Observable[T7],
	ob8 Observable[T8],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8) R,
) Observable[R]

Zip8 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

Zip8 pulls values from each input Observable one by one, it does not buffer any value.

func Zip9 added in v0.7.0

func Zip9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	ob7 Observable[T7],
	ob8 Observable[T8],
	ob9 Observable[T9],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9) R,
) Observable[R]

Zip9 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

Zip9 pulls values from each input Observable one by one, it does not buffer any value.

func ZipWithBuffering2 added in v0.9.0

func ZipWithBuffering2[T1, T2, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	mapping func(v1 T1, v2 T2) R,
) Observable[R]

ZipWithBuffering2 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

ZipWithBuffering2 buffers every value from each input Observable, which might consume a lot of memory over time if there are lots of values emitting faster than zipping.

func ZipWithBuffering3 added in v0.9.0

func ZipWithBuffering3[T1, T2, T3, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	mapping func(v1 T1, v2 T2, v3 T3) R,
) Observable[R]

ZipWithBuffering3 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

ZipWithBuffering3 buffers every value from each input Observable, which might consume a lot of memory over time if there are lots of values emitting faster than zipping.

func ZipWithBuffering4 added in v0.9.0

func ZipWithBuffering4[T1, T2, T3, T4, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4) R,
) Observable[R]

ZipWithBuffering4 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

ZipWithBuffering4 buffers every value from each input Observable, which might consume a lot of memory over time if there are lots of values emitting faster than zipping.

func ZipWithBuffering5 added in v0.9.0

func ZipWithBuffering5[T1, T2, T3, T4, T5, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5) R,
) Observable[R]

ZipWithBuffering5 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

ZipWithBuffering5 buffers every value from each input Observable, which might consume a lot of memory over time if there are lots of values emitting faster than zipping.

func ZipWithBuffering6 added in v0.9.0

func ZipWithBuffering6[T1, T2, T3, T4, T5, T6, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6) R,
) Observable[R]

ZipWithBuffering6 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

ZipWithBuffering6 buffers every value from each input Observable, which might consume a lot of memory over time if there are lots of values emitting faster than zipping.

func ZipWithBuffering7 added in v0.9.0

func ZipWithBuffering7[T1, T2, T3, T4, T5, T6, T7, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	ob7 Observable[T7],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7) R,
) Observable[R]

ZipWithBuffering7 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

ZipWithBuffering7 buffers every value from each input Observable, which might consume a lot of memory over time if there are lots of values emitting faster than zipping.

func ZipWithBuffering8 added in v0.9.0

func ZipWithBuffering8[T1, T2, T3, T4, T5, T6, T7, T8, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	ob7 Observable[T7],
	ob8 Observable[T8],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8) R,
) Observable[R]

ZipWithBuffering8 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

ZipWithBuffering8 buffers every value from each input Observable, which might consume a lot of memory over time if there are lots of values emitting faster than zipping.

func ZipWithBuffering9 added in v0.9.0

func ZipWithBuffering9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	ob7 Observable[T7],
	ob8 Observable[T8],
	ob9 Observable[T9],
	mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9) R,
) Observable[R]

ZipWithBuffering9 combines multiple Observables to create an Observable that emits mappings of the values emitted by each of its input Observables.

ZipWithBuffering9 buffers every value from each input Observable, which might consume a lot of memory over time if there are lots of values emitting faster than zipping.

func (Observable[T]) BlockingFirst

func (ob Observable[T]) BlockingFirst(parent Context) (v T, err error)

BlockingFirst subscribes to ob, returning the first emitted value. If ob emits no values, it returns the zero value of T and ErrEmpty; if ob emits a notification of error, it returns the zero value of T and the error.

The cancellation of parent will cause BlockingFirst to immediately return the zero value of T and parent.Cause().

func (Observable[T]) BlockingFirstOrElse added in v0.8.0

func (ob Observable[T]) BlockingFirstOrElse(parent Context, def T) T

BlockingFirstOrElse subscribes to ob, returning the first emitted value or def if ob emits no values or emits a notification of error.

The cancellation of parent will cause BlockingFirstOrElse to immediately return def.

func (Observable[T]) BlockingLast

func (ob Observable[T]) BlockingLast(parent Context) (v T, err error)

BlockingLast subscribes to ob, returning the last emitted value. If ob emits no values, it returns the zero value of T and ErrEmpty; if ob emits a notification of error, it returns the zero value of T and the error.

The cancellation of parent will cause BlockingLast to immediately return the zero value of T and parent.Cause().

func (Observable[T]) BlockingLastOrElse added in v0.8.0

func (ob Observable[T]) BlockingLastOrElse(parent Context, def T) T

BlockingLastOrElse subscribes to ob, returning the last emitted value or def if ob emits no values or emits a notification of error.

The cancellation of parent will cause BlockingLastOrElse to immediately return def.

func (Observable[T]) BlockingSingle

func (ob Observable[T]) BlockingSingle(parent Context) (v T, err error)

BlockingSingle subscribes to ob, returning the single emitted value. If ob emits more than one value or no values, it returns the zero value of T and ErrNotSingle or ErrEmpty respectively; if ob emits a notification of error, it returns the zero value of T and the error.

The cancellation of parent will cause BlockingSingle to immediately return the zero value of T and parent.Cause().

func (Observable[T]) BlockingSubscribe

func (ob Observable[T]) BlockingSubscribe(parent Context, o Observer[T]) error

BlockingSubscribe subscribes to ob and waits for it to complete. If ob completes without an error, BlockingSubscribe returns nil; otherwise, it returns the emitted error.

The cancellation of parent will cause BlockingSubscribe to immediately return parent.Cause().

func (Observable[T]) Subscribe

func (ob Observable[T]) Subscribe(c Context, o Observer[T])

Subscribe invokes an execution of an Observable.

If ob panics and c.PanicHandler is not nil, Subscribe calls c.PanicHandler with a value returned by the built-in recover function.

type Observer

type Observer[T any] func(n Notification[T])

An Observer is a consumer of notifications delivered by an Observable.

func NewObserver added in v0.8.0

func NewObserver[T any](f func(n Notification[T])) Observer[T]

NewObserver creates an Observer from f.

func WithRuntimeFinalizer added in v0.9.0

func WithRuntimeFinalizer[T any](o Observer[T]) Observer[T]

WithRuntimeFinalizer creates an Observer with a runtime finalizer set to run o.Error(ErrFinalized) in a goroutine. o must be safe for concurrent use.

func (Observer[T]) Complete

func (o Observer[T]) Complete()

Complete passes a completion to o.

func (Observer[T]) DoOnTermination added in v0.9.0

func (o Observer[T]) DoOnTermination(f func()) Observer[T]

DoOnTermination creates an Observer that passes incoming emissions to o, and when a notification of error or completion passes in, calls f just before passing it to o.

func (Observer[T]) ElementsOnly added in v0.4.0

func (o Observer[T]) ElementsOnly(n Notification[T])

ElementsOnly passes n to o if n represents a value.

func (Observer[T]) Emit added in v0.8.0

func (o Observer[T]) Emit(n Notification[T])

Emit passes n to o.

func (Observer[T]) Error

func (o Observer[T]) Error(err error)

Error passes an error to o.

func (Observer[T]) Next

func (o Observer[T]) Next(v T)

Next passes a value to o.

type Operator

type Operator[T, R any] interface {
	Apply(source Observable[T]) Observable[R]
}

An Operator is an operation on an Observable. When applied, they do not change the existing Observable value. Instead, they return a new one, whose subscription logic is based on the first Observable.

func Audit added in v0.7.0

func Audit[T, U any](durationSelector func(v T) Observable[U]) Operator[T, T]

Audit ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process.

It's like AuditTime, but the silencing duration is determined by a second Observable.

func AuditTime added in v0.7.0

func AuditTime[T any](d time.Duration) Operator[T, T]

AuditTime ignores source values for a duration, then emits the most recent value from the source Observable, then repeats this process.

When it sees a source value, it ignores that plus the next ones for a duration, and then it emits the most recent value from the source.

func Catch added in v0.7.0

func Catch[T any](selector func(err error) Observable[T]) Operator[T, T]

Catch mirrors the source or switches to another Observable, returned from a call to selector, if the source emits a notification of error.

Catch does not catch context cancellations.

func Channelize added in v0.9.0

func Channelize[T any](join func(upstream <-chan Notification[T], downstream chan<- Notification[T])) Operator[T, T]

Channelize separates upstream and downstream with two channels, then uses provided join function to connect them.

Notifications sent to downstream must honor the Observable protocol.

Channelize closes downstream channel after join returns.

func Compact added in v0.7.0

func Compact[T any](eq func(v1, v2 T) bool) Operator[T, T]

Compact emits all values emitted by the source Observable that are distinct from the previous, given a comparison function.

func CompactComparable added in v0.7.0

func CompactComparable[T comparable]() Operator[T, T]

CompactComparable emits all values emitted by the source Observable that are distinct from the previous.

func CompactComparableKey added in v0.7.0

func CompactComparableKey[T any, K comparable](mapping func(v T) K) Operator[T, T]

CompactComparableKey emits all values emitted by the source Observable whose mappings are distinct from the previous.

func CompactKey added in v0.7.0

func CompactKey[T, K any](mapping func(v T) K, eq func(v1, v2 K) bool) Operator[T, T]

CompactKey emits all values emitted by the source Observable whose mappings are distinct from the previous, given a comparison function.

func Compose2 added in v0.7.0

func Compose2[A, B, C any](
	op1 Operator[A, B],
	op2 Operator[B, C],
) Operator[A, C]

Compose2 creates an Operator that applies 2 Operators to the input Observable when called.

func Compose3 added in v0.7.0

func Compose3[A, B, C, D any](
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
) Operator[A, D]

Compose3 creates an Operator that applies 3 Operators to the input Observable when called.

func Compose4 added in v0.7.0

func Compose4[A, B, C, D, E any](
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
	op4 Operator[D, E],
) Operator[A, E]

Compose4 creates an Operator that applies 4 Operators to the input Observable when called.

func Compose5 added in v0.7.0

func Compose5[A, B, C, D, E, F any](
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
	op4 Operator[D, E],
	op5 Operator[E, F],
) Operator[A, F]

Compose5 creates an Operator that applies 5 Operators to the input Observable when called.

func Compose6 added in v0.7.0

func Compose6[A, B, C, D, E, F, G any](
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
	op4 Operator[D, E],
	op5 Operator[E, F],
	op6 Operator[F, G],
) Operator[A, G]

Compose6 creates an Operator that applies 6 Operators to the input Observable when called.

func Compose7 added in v0.7.0

func Compose7[A, B, C, D, E, F, G, H any](
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
	op4 Operator[D, E],
	op5 Operator[E, F],
	op6 Operator[F, G],
	op7 Operator[G, H],
) Operator[A, H]

Compose7 creates an Operator that applies 7 Operators to the input Observable when called.

func Compose8 added in v0.7.0

func Compose8[A, B, C, D, E, F, G, H, I any](
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
	op4 Operator[D, E],
	op5 Operator[E, F],
	op6 Operator[F, G],
	op7 Operator[G, H],
	op8 Operator[H, I],
) Operator[A, I]

Compose8 creates an Operator that applies 8 Operators to the input Observable when called.

func Compose9 added in v0.7.0

func Compose9[A, B, C, D, E, F, G, H, I, J any](
	op1 Operator[A, B],
	op2 Operator[B, C],
	op3 Operator[C, D],
	op4 Operator[D, E],
	op5 Operator[E, F],
	op6 Operator[F, G],
	op7 Operator[G, H],
	op8 Operator[H, I],
	op9 Operator[I, J],
) Operator[A, J]

Compose9 creates an Operator that applies 9 Operators to the input Observable when called.

func ConcatWith added in v0.7.0

func ConcatWith[T any](some ...Observable[T]) Operator[T, T]

ConcatWith concatenates the source Observable and some other Observables together to create an Observable that sequentially emits their values, one Observable after the other.

func Contains added in v0.7.0

func Contains[T any](pred func(v T) bool) Operator[T, bool]

Contains emits a boolean to indicate whether any value of the source Observable satisfies a given predicate function.

func ContainsElement added in v0.7.0

func ContainsElement[T comparable](v T) Operator[T, bool]

ContainsElement emits a boolean to indicate whether the source Observable emits a given value.

func Debounce added in v0.7.0

func Debounce[T, U any](durationSelector func(v T) Observable[U]) Operator[T, T]

Debounce emits a value from the source Observable only after a particular time span, determined by another Observable, has passed without another source emission.

It's like DebounceTime, but the time span of emission silence is determined by a second Observable.

func DebounceTime added in v0.7.0

func DebounceTime[T any](d time.Duration) Operator[T, T]

DebounceTime emits a value from the source Observable only after a particular time span has passed without another source emission.

func DefaultIfEmpty added in v0.7.0

func DefaultIfEmpty[T any](s ...T) Operator[T, T]

DefaultIfEmpty mirrors the source Observable, or emits given values if the source completes without emitting any value.

func Delay added in v0.7.0

func Delay[T any](d time.Duration) Operator[T, T]

Delay postpones each emission of values from the source Observable by a given duration.

func Dematerialize added in v0.7.0

func Dematerialize[_ Notification[T], T any]() Operator[Notification[T], T]

Dematerialize converts an Observable of Notification values into the emissions that they represent. It's the opposite of Materialize.

func Discard added in v0.9.0

func Discard[T any]() Operator[T, T]

Discard ignores all values emitted by the source Observable.

func Distinct added in v0.7.0

func Distinct[T any, K comparable](mapping func(v T) K) Operator[T, T]

Distinct emits all values emitted by the source Observable whose mappings are distinct from each other.

func DistinctComparable added in v0.7.0

func DistinctComparable[T comparable]() Operator[T, T]

DistinctComparable emits all values emitted by the source Observable that are distinct from each other.

func Do added in v0.7.0

func Do[T any](tap Observer[T]) Operator[T, T]

Do mirrors the source Observable, passing emissions to tap before each emission.

func DoOnComplete added in v0.7.0

func DoOnComplete[T any](f func()) Operator[T, T]

DoOnComplete mirrors the source Observable, and calls f when the source completes.

func DoOnError added in v0.7.0

func DoOnError[T any](f func(err error)) Operator[T, T]

DoOnError mirrors the source Observable, and calls f when the source emits a notification of error.

func DoOnNext added in v0.7.0

func DoOnNext[T any](f func(v T)) Operator[T, T]

DoOnNext mirrors the source Observable, passing values to f before each value emission.

func DoOnTermination added in v0.9.0

func DoOnTermination[T any](f func()) Operator[T, T]

DoOnTermination mirrors the source Observable, and calls f when the source emits a notification of error or completion.

func EndWith added in v0.7.0

func EndWith[T any](s ...T) Operator[T, T]

EndWith mirrors the source Observable, and emits the values you specify as arguments when the source completes.

func Enumerate added in v0.9.0

func Enumerate[V any, K constraints.Integer](init K) Operator[V, Pair[K, V]]

Enumerate maps each value emitted by the source Observable to a Pair where the Key field stores the index of each value starting from init and the Value field stores each value.

func Every added in v0.7.0

func Every[T any](pred func(v T) bool) Operator[T, bool]

Every emits a boolean to indicate whether every value of the source Observable satisfies a given predicate function.

func ExhaustAll added in v0.7.0

func ExhaustAll[_ Observable[T], T any]() Operator[Observable[T], T]

ExhaustAll flattens a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.

func ExhaustMap added in v0.7.0

func ExhaustMap[T, R any](mapping func(v T) Observable[R]) Operator[T, R]

ExhaustMap converts the source Observable into a higher-order Observable, by mapping each source value to an Observable, then flattens it into a first-order Observable using ExhaustAll.

func ExhaustMapTo added in v0.7.0

func ExhaustMapTo[T, R any](inner Observable[R]) Operator[T, R]

ExhaustMapTo converts the source Observable into a higher-order Observable, by mapping each source value to the same Observable, then flattens it into a first-order Observable using ExhaustAll.

func Filter added in v0.7.0

func Filter[T any](pred func(v T) bool) Operator[T, T]

Filter filters values emitted by the source Observable by only emitting those that satisfy a given predicate function.

func FilterMap added in v0.7.0

func FilterMap[T, R any](pred func(v T) (R, bool)) Operator[T, R]

FilterMap passes each value emitted by the source Observable to a given predicate function and emits their mapping, the first return value of the predicate function, only if the second is true.

func FilterOut added in v0.7.0

func FilterOut[T any](pred func(v T) bool) Operator[T, T]

FilterOut filters out values emitted by the source Observable by only emitting those that do not satisfy a given predicate function.

func First added in v0.7.0

func First[T any]() Operator[T, T]

First emits only the first value emitted by the source Observable. If the source turns out to be empty, First emits a notification of ErrEmpty.

func FirstOrElse added in v0.8.0

func FirstOrElse[T any](def T) Operator[T, T]

FirstOrElse emits only the first value emitted by the source Observable. If the source turns out to be empty, FirstOrElse emits a specified default value.

func Go added in v0.8.0

func Go[T any]() Operator[T, T]

Go mirrors the source Observable in a goroutine.

func GroupBy added in v0.7.0

func GroupBy[T any, K comparable](
	keySelector func(v T) K,
	groupFactory func() Subject[T],
) Operator[T, Pair[K, Observable[T]]]

GroupBy groups the values emitted by the source Observable according to a specified criterion, and emits these grouped values as Pairs, one Pair per group.

func IgnoreElements added in v0.7.0

func IgnoreElements[T, R any]() Operator[T, R]

IgnoreElements ignores all values emitted by the source Observable.

It's like Discard, but it can also change the output Observable to be of another type.

func IsEmpty added in v0.7.0

func IsEmpty[T any]() Operator[T, bool]

IsEmpty emits a boolean to indicate whether the source emits no values.

func KeyOf added in v0.7.0

func KeyOf[_ Pair[K, V], K, V any]() Operator[Pair[K, V], K]

KeyOf maps each Pair emitted by the source Observable to a value stored in the Key field of that Pair.

func Last added in v0.7.0

func Last[T any]() Operator[T, T]

Last emits only the last value emitted by the source Observable. If the source turns out to be empty, Last emits a notification of ErrEmpty.

func LastOrElse added in v0.8.0

func LastOrElse[T any](def T) Operator[T, T]

LastOrElse emits only the last value emitted by the source Observable. If the source turns out to be empty, LastOrElse emits a specified default value.

func LeftOf added in v0.8.0

func LeftOf[_ Pair[K, V], K, V any]() Operator[Pair[K, V], K]

LeftOf is an alias to KeyOf.

func Map added in v0.7.0

func Map[T, R any](mapping func(v T) R) Operator[T, R]

Map applies a given mapping function to each value emitted by the source Observable, then emits the resulting values.

func MapTo added in v0.7.0

func MapTo[T, R any](v R) Operator[T, R]

MapTo emits the given constant value on the output Observable every time the source Observable emits a value.

func Materialize added in v0.7.0

func Materialize[T any]() Operator[T, Notification[T]]

Materialize represents all of the Notifications from the source Observable as values, and then completes.

func MergeWith added in v0.7.0

func MergeWith[T any](some ...Observable[T]) Operator[T, T]

MergeWith merges the source Observable and some other Observables together to create an Observable that concurrently emits all values from the source and every given input Observable.

func NewOperator added in v0.8.0

func NewOperator[T, R any](f func(source Observable[T]) Observable[R]) Operator[T, R]

NewOperator creates an Operator from f.

func OnBackpressureBuffer added in v0.9.0

func OnBackpressureBuffer[T any](capacity int) Operator[T, T]

OnBackpressureBuffer mirrors the source Observable, buffering emissions if the source emits too fast, and terminating the subscription with a notification of ErrBufferOverflow if the buffer is full.

func OnBackpressureCongest added in v0.9.0

func OnBackpressureCongest[T any](capacity int) Operator[T, T]

OnBackpressureCongest mirrors the source Observable, buffering emissions if the source emits too fast, and blocking the source if the buffer is full.

func OnBackpressureDrop added in v0.9.0

func OnBackpressureDrop[T any](capacity int) Operator[T, T]

OnBackpressureDrop mirrors the source Observable, buffering emissions if the source emits too fast, and dropping emissions if the buffer is full.

func OnBackpressureLatest added in v0.9.0

func OnBackpressureLatest[T any](capacity int) Operator[T, T]

OnBackpressureLatest mirrors the source Observable, buffering emissions if the source emits too fast, and dropping oldest emissions from the buffer if it is full.

func OnErrorComplete added in v0.7.0

func OnErrorComplete[T any]() Operator[T, T]

OnErrorComplete mirrors the source Observable, or completes if the source emits a notification of error.

OnErrorComplete does not complete after context cancellation.

func OnErrorResumeWith added in v0.7.0

func OnErrorResumeWith[T any](ob Observable[T]) Operator[T, T]

OnErrorResumeWith mirrors the source or specified Observable if the source emits a notification of error.

OnErrorResumeWith does not resume after context cancellation.

func Pairwise added in v0.8.0

func Pairwise[T any]() Operator[T, Pair[T, T]]

Pairwise groups pairs of consecutive emissions together and emits them as Pairs.

func RaceWith added in v0.7.0

func RaceWith[T any](some ...Observable[T]) Operator[T, T]

RaceWith mirrors the first Observable to emit a value, from the source and given input Observables.

func Reduce added in v0.7.0

func Reduce[T, R any](init R, accumulator func(v1 R, v2 T) R) Operator[T, R]

Reduce applies an accumulator function over the source Observable, and emits the accumulated result when the source completes, given an initial value.

func Repeat added in v0.7.0

func Repeat[T any](count int) Operator[T, T]

Repeat repeats the stream of values emitted by the source Observable at most count times.

Repeat(0) results in an empty Observable; Repeat(1) is a no-op.

Repeat does not repeat after context cancellation.

func RepeatForever added in v0.7.0

func RepeatForever[T any]() Operator[T, T]

RepeatForever repeats the stream of values emitted by the source Observable forever.

RepeatForever does not repeat after context cancellation.

func Retry added in v0.7.0

func Retry[T any](count int) Operator[T, T]

Retry mirrors the source Observable, and resubscribes to the source when the source emits a notification of error, for a maximum of count resubscriptions.

Retry(0) is a no-op.

Retry does not retry after context cancellation.

func RetryForever added in v0.7.0

func RetryForever[T any]() Operator[T, T]

RetryForever mirrors the source Observable, and resubscribes to the source whenever the source emits a notification of error.

RetryForever does not retry after context cancellation.

func RightOf added in v0.8.0

func RightOf[_ Pair[K, V], K, V any]() Operator[Pair[K, V], V]

RightOf is an alias to ValueOf.

func Sample added in v0.7.0

func Sample[T, U any](notifier Observable[U]) Operator[T, T]

Sample emits the most recently emitted value from the source Observable whenever notifier, another Observable, emits a value.

func SampleTime added in v0.7.0

func SampleTime[T any](d time.Duration) Operator[T, T]

SampleTime emits the most recently emitted value from the source Observalbe within periodic time intervals.

func Scan added in v0.7.0

func Scan[T, R any](init R, accumulator func(v1 R, v2 T) R) Operator[T, R]

Scan applies an accumulator function over the source Observable, and emits each intermediate result, given an initial value.

func Single added in v0.7.0

func Single[T any]() Operator[T, T]

Single emits the single value emitted by the source Observable. If the source emits more than one value or no values, it emits a notification of ErrNotSingle or ErrEmpty respectively.

func Skip added in v0.7.0

func Skip[T any](count int) Operator[T, T]

Skip skips the first count values emitted by the source Observable.

func SkipLast added in v0.7.0

func SkipLast[T any](count int) Operator[T, T]

SkipLast skips the last count values emitted by the source Observable.

func SkipUntil added in v0.7.0

func SkipUntil[T, U any](notifier Observable[U]) Operator[T, T]

SkipUntil skips values emitted by the source Observable until a second Observable emits an value.

func SkipWhile added in v0.7.0

func SkipWhile[T any](pred func(v T) bool) Operator[T, T]

SkipWhile skips all values emitted by the source Observable as long as a given predicate function returns true, but emits all further source values as soon as the predicate function returns false.

func StartWith added in v0.7.0

func StartWith[T any](s ...T) Operator[T, T]

StartWith emits the values you specify as arguments before it begins to mirrors the source Observable.

func SwitchAll added in v0.7.0

func SwitchAll[_ Observable[T], T any]() Operator[Observable[T], T]

SwitchAll flattens a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables.

func SwitchIfEmpty added in v0.7.0

func SwitchIfEmpty[T any](ob Observable[T]) Operator[T, T]

SwitchIfEmpty mirrors the source or specified Observable if the source completes without emitting any value.

func SwitchMap added in v0.7.0

func SwitchMap[T, R any](mapping func(v T) Observable[R]) Operator[T, R]

SwitchMap converts the source Observable into a higher-order Observable, by mapping each source value to an Observable, then flattens it into a first-order Observable using SwitchAll.

func SwitchMapTo added in v0.7.0

func SwitchMapTo[T, R any](inner Observable[R]) Operator[T, R]

SwitchMapTo converts the source Observable into a higher-order Observable, by mapping each source value to the same Observable, then flattens it into a first-order Observable using SwitchAll.

func Take added in v0.7.0

func Take[T any](count int) Operator[T, T]

Take emits only the first count values emitted by the source Observable.

func TakeLast added in v0.7.0

func TakeLast[T any](count int) Operator[T, T]

TakeLast emits only the last count values emitted by the source Observable.

func TakeUntil added in v0.7.0

func TakeUntil[T, U any](notifier Observable[U]) Operator[T, T]

TakeUntil mirrors the source Observable until a second Observable emits a value.

func TakeWhile added in v0.7.0

func TakeWhile[T any](pred func(v T) bool) Operator[T, T]

TakeWhile emits values emitted by the source Observable so long as each value satisfies a given predicate function, and then completes as soon as the predicate function returns false.

func ThrowIfEmpty added in v0.7.0

func ThrowIfEmpty[T any]() Operator[T, T]

ThrowIfEmpty mirrors the source Observable, or emits a notification of ErrEmpty if the source completes without emitting any value.

func ToSlice added in v0.7.0

func ToSlice[T any]() Operator[T, []T]

ToSlice collects all the values emitted by the source Observable, and then emits them as a slice when the source completes.

func ValueOf added in v0.7.0

func ValueOf[_ Pair[K, V], K, V any]() Operator[Pair[K, V], V]

ValueOf maps each Pair emitted by the source Observable to a value stored in the Value field of that Pair.

func WithLatestFrom1 added in v0.8.0

func WithLatestFrom1[T0, T1, R any](
	ob1 Observable[T1],
	mapping func(v0 T0, v1 T1) R,
) Operator[T0, R]

WithLatestFrom1 combines the source with another Observable to create an Observable that emits mappings of the latest values emitted by each Observable, only when the source emits.

func WithLatestFrom2 added in v0.8.0

func WithLatestFrom2[T0, T1, T2, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	mapping func(v0 T0, v1 T1, v2 T2) R,
) Operator[T0, R]

WithLatestFrom2 combines the source with 2 other Observables to create an Observable that emits mappings of the latest values emitted by each Observable, only when the source emits.

func WithLatestFrom3 added in v0.8.0

func WithLatestFrom3[T0, T1, T2, T3, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	mapping func(v0 T0, v1 T1, v2 T2, v3 T3) R,
) Operator[T0, R]

WithLatestFrom3 combines the source with 3 other Observables to create an Observable that emits mappings of the latest values emitted by each Observable, only when the source emits.

func WithLatestFrom4 added in v0.8.0

func WithLatestFrom4[T0, T1, T2, T3, T4, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	mapping func(v0 T0, v1 T1, v2 T2, v3 T3, v4 T4) R,
) Operator[T0, R]

WithLatestFrom4 combines the source with 4 other Observables to create an Observable that emits mappings of the latest values emitted by each Observable, only when the source emits.

func WithLatestFrom5 added in v0.8.0

func WithLatestFrom5[T0, T1, T2, T3, T4, T5, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	mapping func(v0 T0, v1 T1, v2 T2, v3 T3, v4 T4, v5 T5) R,
) Operator[T0, R]

WithLatestFrom5 combines the source with 5 other Observables to create an Observable that emits mappings of the latest values emitted by each Observable, only when the source emits.

func WithLatestFrom6 added in v0.8.0

func WithLatestFrom6[T0, T1, T2, T3, T4, T5, T6, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	mapping func(v0 T0, v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6) R,
) Operator[T0, R]

WithLatestFrom6 combines the source with 6 other Observables to create an Observable that emits mappings of the latest values emitted by each Observable, only when the source emits.

func WithLatestFrom7 added in v0.8.0

func WithLatestFrom7[T0, T1, T2, T3, T4, T5, T6, T7, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	ob7 Observable[T7],
	mapping func(v0 T0, v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7) R,
) Operator[T0, R]

WithLatestFrom7 combines the source with 7 other Observables to create an Observable that emits mappings of the latest values emitted by each Observable, only when the source emits.

func WithLatestFrom8 added in v0.8.0

func WithLatestFrom8[T0, T1, T2, T3, T4, T5, T6, T7, T8, R any](
	ob1 Observable[T1],
	ob2 Observable[T2],
	ob3 Observable[T3],
	ob4 Observable[T4],
	ob5 Observable[T5],
	ob6 Observable[T6],
	ob7 Observable[T7],
	ob8 Observable[T8],
	mapping func(v0 T0, v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8) R,
) Operator[T0, R]

WithLatestFrom8 combines the source with 8 other Observables to create an Observable that emits mappings of the latest values emitted by each Observable, only when the source emits.

type Pair added in v0.2.0

type Pair[K, V any] struct {
	Key   K
	Value V
}

A Pair is a struct of two elements.

func NewPair added in v0.8.0

func NewPair[K, V any](k K, v V) Pair[K, V]

NewPair creates a Pair of two elements.

func (Pair[K, V]) Left added in v0.7.0

func (p Pair[K, V]) Left() K

Left returns p.Key.

func (Pair[K, V]) Right added in v0.7.0

func (p Pair[K, V]) Right() V

Right returns p.Value.

type ShareOperator added in v0.7.0

type ShareOperator[T any] struct {
	// contains filtered or unexported fields
}

ShareOperator is an Operator type for Share.

func Share added in v0.7.0

func Share[T any](c Context) ShareOperator[T]

Share returns a new Observable that multicasts (shares) the source Observable. When subscribed multiple times, it guarantees that only one subscription is made to the source at the same time. When all subscribers have unsubscribed it will unsubscribe from the source.

func (ShareOperator[T]) Apply added in v0.7.0

func (op ShareOperator[T]) Apply(source Observable[T]) Observable[T]

Apply implements the Operator interface.

func (ShareOperator[T]) WithConnector added in v0.7.0

func (op ShareOperator[T]) WithConnector(connector func() Subject[T]) ShareOperator[T]

WithConnector sets Connector option to a given value.

type Subject

type Subject[T any] struct {
	Observable[T]
	Observer[T]
}

A Subject is both an Observable and an Observer. Observers subscribing to Subject's Observable part may receive emissions from Subject's Observer part.

func Multicast added in v0.4.0

func Multicast[T any]() Subject[T]

Multicast returns a Subject that forwards every value it receives to all its subscribers. Values emitted to a Multicast before the first subscriber are lost.

func MulticastBuffer added in v0.9.0

func MulticastBuffer[T any](n int) Subject[T]

MulticastBuffer returns a Subject that keeps track of a certain number of recent values it receive. Each subscriber will then receive all tracked values as well as future values.

If n < 0, MulticastBuffer keeps track of every value it receives; if n == 0, MulticastBuffer doesn't keep track of any value it receives at all.

func MulticastBufferAll added in v0.9.0

func MulticastBufferAll[T any]() Subject[T]

MulticastBufferAll returns a Subject that keeps track of every value it receives. Each subscriber will then receive all tracked values as well as future values.

func Unicast added in v0.4.0

func Unicast[T any]() Subject[T]

Unicast returns a Subject that only forwards every value it receives to the first subscriber. The first subscriber gets all future values. Subsequent subscribers will immediately receive a notification of ErrUnicast. Values emitted to a Unicast before the first subscriber are lost.

func UnicastBuffer added in v0.9.0

func UnicastBuffer[T any](n int) Subject[T]

UnicastBuffer returns a Subject that keeps track of a certain number of recent values it receive before the first subscriber. The first subscriber will then receive all tracked values as well as future values. Subsequent subscribers will immediately receive a notification of ErrUnicast.

If n < 0, UnicastBuffer keeps track of every value it receives before the first subscriber; if n == 0, UnicastBuffer doesn't keep track of any value it receives at all.

func UnicastBufferAll added in v0.9.0

func UnicastBufferAll[T any]() Subject[T]

UnicastBufferAll returns a Subject that keeps track of every value it receives before the first subscriber. The first subscriber will then receive all tracked values as well as future values. Subsequent subscribers will immediately receive a notification of ErrUnicast.

type ThrottleOperator added in v0.7.0

type ThrottleOperator[T, U any] struct {
	// contains filtered or unexported fields
}

ThrottleOperator is an Operator type for Throttle.

func Throttle added in v0.7.0

func Throttle[T, U any](durationSelector func(v T) Observable[U]) ThrottleOperator[T, U]

Throttle emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process until the source completes.

It's like ThrottleTime, but the silencing duration is determined by a second Observable.

func ThrottleTime added in v0.7.0

func ThrottleTime[T any](d time.Duration) ThrottleOperator[T, time.Time]

ThrottleTime emits a value from the source Observable, then ignores subsequent source values for a duration, then repeats this process until the source completes.

ThrottleTime lets a value pass, then ignores source values for the next duration time.

func (ThrottleOperator[T, U]) Apply added in v0.7.0

func (op ThrottleOperator[T, U]) Apply(source Observable[T]) Observable[T]

Apply implements the Operator interface.

func (ThrottleOperator[T, U]) WithLeading added in v0.7.0

func (op ThrottleOperator[T, U]) WithLeading(v bool) ThrottleOperator[T, U]

WithLeading sets Leading option to a given value.

func (ThrottleOperator[T, U]) WithTrailing added in v0.7.0

func (op ThrottleOperator[T, U]) WithTrailing(v bool) ThrottleOperator[T, U]

WithTrailing sets Trailing option to a given value.

type TimeoutOperator added in v0.7.0

type TimeoutOperator[T any] struct {
	// contains filtered or unexported fields
}

TimeoutOperator is an Operator type for Timeout.

func Timeout added in v0.7.0

func Timeout[T any](d time.Duration) TimeoutOperator[T]

Timeout mirrors the source Observable, or emits a notification of ErrTimeout if the source does not emit a value in given time span.

func (TimeoutOperator[T]) Apply added in v0.7.0

func (op TimeoutOperator[T]) Apply(source Observable[T]) Observable[T]

Apply implements the Operator interface.

func (TimeoutOperator[T]) WithFirst added in v0.7.0

func (op TimeoutOperator[T]) WithFirst(d time.Duration) TimeoutOperator[T]

WithFirst sets First option to a given value.

func (TimeoutOperator[T]) WithObservable added in v0.7.0

func (op TimeoutOperator[T]) WithObservable(ob Observable[T]) TimeoutOperator[T]

WithObservable sets With option to a given value.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL