Documentation ¶
Overview ¶
Example ¶
gen := func(ctx context.Context, sink flux.Sink) { for i := 0; i < 10; i++ { v := i sink.Next(v) } sink.Complete() } done := make(chan struct{}) var su reactor.Subscription flux.Create(gen). Filter(func(i Any) bool { return i.(int)%2 == 0 }). Map(func(i interface{}) (Any, error) { return fmt.Sprintf("#HELLO_%04d", i.(int)), nil }). SubscribeOn(scheduler.Elastic()). Subscribe(context.Background(), reactor.OnSubscribe(func(ctx context.Context, s reactor.Subscription) { su = s s.Request(1) }), reactor.OnNext(func(v Any) error { fmt.Println("next:", v) su.Request(1) return nil }), reactor.OnComplete(func() { close(done) }), ) <-done
Output:
Index ¶
Examples ¶
Constants ¶
View Source
const ( BuffSizeXS = 32 BuffSizeSM = 256 )
Variables ¶
This section is empty.
Functions ¶
func InitBuffSize ¶ added in v0.2.0
func InitBuffSize(size int)
InitBuffSize initialize the size of buff. (default=256)
Types ¶
type CreateOption ¶ added in v0.0.11
type CreateOption func(*fluxCreate)
func WithOverflowStrategy ¶ added in v0.0.11
func WithOverflowStrategy(o OverflowStrategy) CreateOption
type Flux ¶
type Flux interface { reactor.Publisher Filter(reactor.Predicate) Flux Map(reactor.Transformer) Flux Take(n int) Flux DoOnDiscard(reactor.FnOnDiscard) Flux DoOnNext(reactor.FnOnNext) Flux DoOnComplete(reactor.FnOnComplete) Flux DoOnError(reactor.FnOnError) Flux DoOnCancel(reactor.FnOnCancel) Flux DoOnRequest(reactor.FnOnRequest) Flux DoOnSubscribe(reactor.FnOnSubscribe) Flux DoFinally(reactor.FnOnFinally) Flux SwitchOnFirst(FnSwitchOnFirst) Flux DelayElement(delay time.Duration) Flux SubscribeOn(scheduler.Scheduler) Flux SubscribeWithChan(ctx context.Context, valueChan interface{}, errChan chan<- error) BlockFirst(context.Context) (Any, error) BlockLast(context.Context) (Any, error) BlockToSlice(ctx context.Context, slicePtr interface{}) error }
type FnSwitchOnFirst ¶ added in v0.0.5
type OverflowStrategy ¶
type OverflowStrategy int8
const ( OverflowBuffer OverflowStrategy = iota OverflowIgnore OverflowError OverflowDrop OverflowLatest )
type Processor ¶ added in v0.0.5
func NewUnicastProcessor ¶ added in v0.0.5
func NewUnicastProcessor() Processor
Source Files ¶
Click to show internal directories.
Click to hide internal directories.