reactor

package module
v0.5.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2025 License: MIT Imports: 4 Imported by: 5

README

reactor-go 🚀🚀🚀

GitHub Workflow Status codecov GoDoc Go Report Card License GitHub Release

A golang implementation for reactive-streams.

Install

go get -u github.com/jjeffcaii/reactor-go

Example

Mono
package mono_test

import (
	"context"
	"fmt"

	"github.com/jjeffcaii/reactor-go"
	"github.com/jjeffcaii/reactor-go/mono"
)

func Example() {
	gen := func(ctx context.Context, sink mono.Sink) {
		sink.Success("World")
	}
	mono.
		Create(gen).
		Map(func(input reactor.Any) (output reactor.Any, err error) {
			output = "Hello " + input.(string) + "!"
			return
		}).
		DoOnNext(func(v reactor.Any) error {
			fmt.Println(v)
			return nil
		}).
		Subscribe(context.Background())
}

// Should print
// Hello World!

Flux
package flux_test

import (
	"context"
	"fmt"

	"github.com/jjeffcaii/reactor-go"
	"github.com/jjeffcaii/reactor-go/flux"
	"github.com/jjeffcaii/reactor-go/scheduler"
)

func Example() {
	gen := func(ctx context.Context, sink flux.Sink) {
		for i := 0; i < 10; i++ {
			v := i
			sink.Next(v)
		}
		sink.Complete()
	}
	done := make(chan struct{})

	var su reactor.Subscription
	flux.Create(gen).
		Filter(func(i interface{}) bool {
			return i.(int)%2 == 0
		}).
		Map(func(input reactor.Any) (output reactor.Any, err error) {
			output = fmt.Sprintf("#HELLO_%04d", input.(int))
			return
		}).
		SubscribeOn(scheduler.Elastic()).
		Subscribe(context.Background(),
			reactor.OnSubscribe(func(s reactor.Subscription) {
				su = s
				s.Request(1)
			}),
			reactor.OnNext(func(v reactor.Any) error {
				fmt.Println("next:", v)
				su.Request(1)
				return nil
			}),
			reactor.OnComplete(func() {
				close(done)
			}),
		)
	<-done
}
// Should print:
// next: #HELLO_0000
// next: #HELLO_0002
// next: #HELLO_0004
// next: #HELLO_0006
// next: #HELLO_0008

Documentation

Index

Constants

View Source
const RequestInfinite = math.MaxInt32

RequestInfinite means request items indefinitely.

Variables

View Source
var (
	ErrNegativeRequest    = fmt.Errorf("invalid request: n must be between %d and %d", 1, RequestInfinite)
	ErrSubscribeCancelled = errors.New("subscriber has been cancelled")
)

Functions

func IsCancelledError added in v0.2.4

func IsCancelledError(err error) bool

IsCancelledError returns true if given error is a cancelled subscribe error.

func NewContextError added in v0.4.2

func NewContextError(err error) error

Types

type Any added in v0.2.0

type Any = interface{}

Any is an alias of interface{} which means a value of any type.

type Disposable

type Disposable interface {
	// Dispose dispose current resource.
	Dispose()
}

Disposable is a disposable resource.

type FnOnCancel

type FnOnCancel = func()

A group of action functions.

type FnOnComplete

type FnOnComplete = func()

A group of action functions.

type FnOnDiscard added in v0.0.4

type FnOnDiscard = func(v Any)

A group of action functions.

type FnOnError

type FnOnError = func(e error)

A group of action functions.

type FnOnFinally

type FnOnFinally = func(s SignalType)

A group of action functions.

type FnOnNext

type FnOnNext = func(v Any) error

A group of action functions.

type FnOnRequest

type FnOnRequest = func(n int)

A group of action functions.

type FnOnSubscribe

type FnOnSubscribe = func(ctx context.Context, su Subscription)

A group of action functions.

type Item added in v0.4.0

type Item struct {
	V Any
	E error
}

Item is type of element.

type Predicate

type Predicate func(Any) bool

type Processor

type Processor interface {
	Publisher
	Subscriber
}

Processor combines the Publisher and Subscriber.

type Publisher

type Publisher interface {
	RawPublisher
	// Subscribe subscribes current Publisher with some options.
	Subscribe(context.Context, ...SubscriberOption)
}

Publisher is th basic type that can be subscribed

type RawPublisher added in v0.0.2

type RawPublisher interface {
	// SubscribeWith subscribes current Publisher with a Subscriber.
	SubscribeWith(context.Context, Subscriber)
}

RawPublisher is the basic low-level Publisher that can be subscribed with a Subscriber.

type SignalType added in v0.0.5

type SignalType int8

SignalType is type of terminal signal.

const (
	SignalTypeDefault SignalType = iota
	SignalTypeComplete
	SignalTypeCancel
	SignalTypeError
)

func (SignalType) String added in v0.0.5

func (s SignalType) String() string

type Subscriber

type Subscriber interface {
	// OnComplete is successful terminal state.
	OnComplete()
	// OnError is failed terminal state.
	OnError(error)
	// OnNext is invoked when a data notification sent by the Publisher in response to requests to Subscription.Request(int).
	OnNext(Any)
	// OnSubscribe is invoked after calling RawPublisher.SubscribeWith(context.Context, Subscriber).
	OnSubscribe(context.Context, Subscription)
}

Subscriber is the basic type to subscribing the Publisher and consumes the items from upstream.

func NewSubscriber

func NewSubscriber(opts ...SubscriberOption) Subscriber

NewSubscriber creates a Subscriber with given options.

type SubscriberOption

type SubscriberOption func(*subscriber)

SubscriberOption is used to create a Subscriber easily.

func OnComplete

func OnComplete(onComplete FnOnComplete) SubscriberOption

OnComplete specified a Subscriber.OnComplete action.

func OnError

func OnError(onError FnOnError) SubscriberOption

OnError specified a Subscriber.OnError action.

func OnNext

func OnNext(onNext FnOnNext) SubscriberOption

OnNext specified a Subscriber.OnNext action.

func OnSubscribe

func OnSubscribe(onSubscribe FnOnSubscribe) SubscriberOption

OnSubscribe specified a Subscriber.OnSubscribe action.

type Subscription

type Subscription interface {
	// Request requests the next N items.
	Request(n int)
	// Cancel cancels the current lifecycle of subscribing.
	Cancel()
}

Subscription represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher.

type Transformer

type Transformer func(Any) (Any, error)

Directories

Path Synopsis
buffer
Package buffer provides an implementation of an unbounded buffer.
Package buffer provides an implementation of an unbounded buffer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL