reactor

package module
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2020 License: MIT Imports: 4 Imported by: 5

README

reactor-go 🚀🚀🚀

Build Status Coverage Status GoDoc Go Report Card License GitHub Release

A golang implementation for reactive-streams.
🚧🚧🚧 IT IS UNDER ACTIVE DEVELOPMENT!!!
⚠️⚠️⚠️ DO NOT USE IN ANY PRODUCTION ENVIRONMENT!!!

Install

go get -u github.com/jjeffcaii/reactor-go

Example

NOTICE:
We can only use func(interface{})interface{} for most operations because Golang has not Generics. 😭
If you have any better idea, please let me know. 😀

Mono
package mono_test

import (
	"context"
	"fmt"

	"github.com/jjeffcaii/reactor-go"
	"github.com/jjeffcaii/reactor-go/mono"
)

func Example() {
	gen := func(ctx context.Context, sink mono.Sink) {
		sink.Success("World")
	}
	mono.
		Create(gen).
		Map(func(input reactor.Any) (output reactor.Any, err error) {
			output = "Hello " + input.(string) + "!"
			return
		}).
		DoOnNext(func(v reactor.Any) error {
			fmt.Println(v)
			return nil
		}).
		Subscribe(context.Background())
}

// Should print
// Hello World!

Flux
package flux_test

import (
	"context"
	"fmt"

	"github.com/jjeffcaii/reactor-go"
	"github.com/jjeffcaii/reactor-go/flux"
	"github.com/jjeffcaii/reactor-go/scheduler"
)

func Example() {
	gen := func(ctx context.Context, sink flux.Sink) {
		for i := 0; i < 10; i++ {
			v := i
			sink.Next(v)
		}
		sink.Complete()
	}
	done := make(chan struct{})

	var su reactor.Subscription
	flux.Create(gen).
		Filter(func(i interface{}) bool {
			return i.(int)%2 == 0
		}).
		Map(func(input reactor.Any) (output reactor.Any, err error) {
			output = fmt.Sprintf("#HELLO_%04d", input.(int))
			return
		}).
		SubscribeOn(scheduler.Elastic()).
		Subscribe(context.Background(),
			reactor.OnSubscribe(func(s reactor.Subscription) {
				su = s
				s.Request(1)
			}),
			reactor.OnNext(func(v reactor.Any) error {
				fmt.Println("next:", v)
				su.Request(1)
				return nil
			}),
			reactor.OnComplete(func() {
				close(done)
			}),
		)
	<-done
}
// Should print:
// next: #HELLO_0000
// next: #HELLO_0002
// next: #HELLO_0004
// next: #HELLO_0006
// next: #HELLO_0008

Documentation

Index

Constants

View Source
const RequestInfinite = math.MaxInt32

Variables

View Source
var (
	ErrNegativeRequest    = fmt.Errorf("invalid request: n must be between %d and %d", 1, RequestInfinite)
	ErrSubscribeCancelled = errors.New("subscriber has been cancelled")
)

Functions

func IsCancelledError added in v0.2.4

func IsCancelledError(err error) bool

Types

type Any added in v0.2.0

type Any = interface{}

type Disposable

type Disposable interface {
	// Dispose dispose current resource.
	Dispose()
	// IsDisposed returns true if it has been disposed.
	IsDisposed() bool
}

Disposable is a disposable resource.

type FnOnCancel

type FnOnCancel = func()

type FnOnComplete

type FnOnComplete = func()

type FnOnDiscard added in v0.0.4

type FnOnDiscard = func(v Any)

type FnOnError

type FnOnError = func(e error)

type FnOnFinally

type FnOnFinally = func(s SignalType)

type FnOnNext

type FnOnNext = func(v Any) error

type FnOnRequest

type FnOnRequest = func(n int)

type FnOnSubscribe

type FnOnSubscribe = func(ctx context.Context, su Subscription)

type Predicate

type Predicate func(Any) bool

type Processor

type Processor interface {
	Publisher
	Subscriber
}

type Publisher

type Publisher interface {
	RawPublisher
	Subscribe(context.Context, ...SubscriberOption)
}

type RawPublisher added in v0.0.2

type RawPublisher interface {
	SubscribeWith(context.Context, Subscriber)
}

type SignalType added in v0.0.5

type SignalType int8
const (
	SignalTypeDefault SignalType = iota
	SignalTypeComplete
	SignalTypeCancel
	SignalTypeError
)

func (SignalType) String added in v0.0.5

func (s SignalType) String() string

type Subscriber

type Subscriber interface {
	OnComplete()
	OnError(error)
	OnNext(Any)
	OnSubscribe(context.Context, Subscription)
}

func NewSubscriber

func NewSubscriber(opts ...SubscriberOption) Subscriber

type SubscriberOption

type SubscriberOption func(*subscriber)

func OnComplete

func OnComplete(onComplete FnOnComplete) SubscriberOption

func OnError

func OnError(onError FnOnError) SubscriberOption

func OnNext

func OnNext(onNext FnOnNext) SubscriberOption

func OnSubscribe

func OnSubscribe(onSubscribe FnOnSubscribe) SubscriberOption

type Subscription

type Subscription interface {
	Request(n int)
	Cancel()
}

type Transformer

type Transformer func(Any) (Any, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL