rs

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2019 License: MIT Imports: 3 Imported by: 5

README ΒΆ

reactor-go πŸš€πŸš€πŸš€

Build Status GoDoc Go Report Card License GitHub Release

A golang implementation for reactive-streams. [WARNNING] IT IS UNDER ACTIVE DEVELOPMENT!!! DO NOT USE IN ANY PRODUCTION ENVIRONMENT!!!

🏠 Homepage

Install

go get -u github.com/jjeffcaii/reactor-go

Example

NOTICE:
We can only use func(interface{})interface{} for most operations because Golang has not Generics. 😭
If you have any better idea, please let me know. πŸ˜€

Mono
package mono_test

import (
	"context"
	"fmt"

	"github.com/jjeffcaii/reactor-go"
	"github.com/jjeffcaii/reactor-go/mono"
)

func Example() {
	gen := func(ctx context.Context, sink mono.Sink) {
		sink.Success("World")
	}
	mono.
		Create(gen).
		Map(func(i interface{}) interface{} {
			return "Hello " + i.(string) + "!"
		}).
		DoOnNext(func(s rs.Subscription, v interface{}) {
			fmt.Println(v)
		}).
		Subscribe(context.Background())
}

// Should print
// Hello World!

Flux
package rs_test

import (
	"context"
	"fmt"

	"github.com/jjeffcaii/reactor-go"
	"github.com/jjeffcaii/reactor-go/flux"
	"github.com/jjeffcaii/reactor-go/scheduler"
)

func Example() {
	gen := func(ctx context.Context, sink flux.Sink) {
		for i := 0; i < 10; i++ {
			v := i
			sink.Next(v)
		}
		sink.Complete()
	}
	done := make(chan struct{})
	flux.Create(gen).
		Filter(func(i interface{}) bool {
			return i.(int)%2 == 0
		}).
		Map(func(i interface{}) interface{} {
			return fmt.Sprintf("#HELLO_%04d", i.(int))
		}).
		SubscribeOn(scheduler.Elastic()).
		Subscribe(context.Background(),
			rs.OnSubscribe(func(s rs.Subscription) {
				s.Request(1)
			}),
			rs.OnNext(func(s rs.Subscription, v interface{}) {
				fmt.Println("next:", v)
				s.Request(1)
			}),
			rs.OnComplete(func() {
				close(done)
			}),
		)
	<-done
}
// Should print:
// next: #HELLO_0000
// next: #HELLO_0002
// next: #HELLO_0004
// next: #HELLO_0006
// next: #HELLO_0008

Author

πŸ‘€ Jeffsky

Show your support

Give a ⭐️ if this project helped you!


This README was generated with ❀️ by readme-md-generator

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

View Source
const RequestInfinite = math.MaxInt32

Variables ΒΆ

View Source
var ErrCancelled = errors.New("subscriber has been cancelled")

Functions ΒΆ

This section is empty.

Types ΒΆ

type Disposable ΒΆ

type Disposable interface {
	// Dispose dispose current resource.
	Dispose()
	// IsDisposed returns true if it has been disposed.
	IsDisposed() bool
}

Disposable is a disposable resource.

type FnOnCancel ΒΆ

type FnOnCancel = func()

type FnOnComplete ΒΆ

type FnOnComplete = func()

type FnOnError ΒΆ

type FnOnError = func(error)

type FnOnFinally ΒΆ

type FnOnFinally = func(Signal)

type FnOnNext ΒΆ

type FnOnNext = func(s Subscription, v interface{})

type FnOnRequest ΒΆ

type FnOnRequest = func(int)

type FnOnSubscribe ΒΆ

type FnOnSubscribe = func(Subscription)

type Predicate ΒΆ

type Predicate func(interface{}) bool

type Processor ΒΆ

type Processor interface {
	Publisher
	Subscriber
}

type Publisher ΒΆ

type Publisher interface {
	RawPublisher
	Subscribe(context.Context, ...SubscriberOption)
}

type RawPublisher ΒΆ added in v0.0.2

type RawPublisher interface {
	SubscribeWith(context.Context, Subscriber)
}

type Signal ΒΆ

type Signal int8
const (
	SignalDefault Signal = iota
	SignalComplete
	SignalCancel
	SignalError
)

func (Signal) String ΒΆ

func (s Signal) String() string

type Subscriber ΒΆ

type Subscriber interface {
	OnComplete()
	OnError(error)
	OnNext(Subscription, interface{})
	OnSubscribe(Subscription)
}

func NewSubscriber ΒΆ

func NewSubscriber(opts ...SubscriberOption) Subscriber

type SubscriberOption ΒΆ

type SubscriberOption func(*subscriber)

func OnComplete ΒΆ

func OnComplete(onComplete FnOnComplete) SubscriberOption

func OnError ΒΆ

func OnError(onError FnOnError) SubscriberOption

func OnNext ΒΆ

func OnNext(onNext FnOnNext) SubscriberOption

func OnSubscribe ΒΆ

func OnSubscribe(onSubscribe FnOnSubscribe) SubscriberOption

type Subscription ΒΆ

type Subscription interface {
	Request(n int)
	Cancel()
}

type Transformer ΒΆ

type Transformer func(interface{}) interface{}

Directories ΒΆ

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL