rx

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2021 License: MIT Imports: 0 Imported by: 0

README

RxGo

Build Status Coverage Status Go Report Card

Reactive Extensions for the Go Language

Call for Maintainers

The development of RxGo v2 has started (v2 branch). Ongoing discussions can be found in #99.

We are welcoming anyone who's willing to help us maintaining or would like to become a core developer to get in touch with us.

Contributions

All contributions are welcome, both in development and documentation! Be sure you check out contributions and roadmap.

Getting Started

ReactiveX, or Rx for short, is an API for programming with observable streams. This is a ReactiveX API for the Go language.

ReactiveX is a new, alternative way of asynchronous programming to callbacks, promises and deferred. It is about processing streams of events or items, with events being any occurrences or changes within the system.

In Go, it is simpler to think of a observable stream as a channel which can Subscribe to a set of handler or callback functions.

The pattern is that you Subscribe to an Observable using an Observer:

subscription := observable.Subscribe(observer)

An Observer is a type consists of three EventHandler fields, the NextHandler, ErrHandler, and DoneHandler, respectively. These handlers can be evoked with OnNext, OnError, and OnDone methods, respectively.

The Observer itself is also an EventHandler. This means all types mentioned can be subscribed to an Observable.

nextHandler := func(item interface{}) interface{} {
	if num, ok := item.(int); ok {
		nums = append(nums, num)
	}
}

// Only next item will be handled.
sub := observable.Subscribe(handlers.NextFunc(nextHandler))

NOTE: Observables are not active in themselves. They need to be subscribed to make something happen. Simply having an Observable lying around doesn't make anything happen, like sitting and watching time flies.

Install

go get -u github.com/reactivex/rxgo

Importing the Rx package

Certain types, such as observer.Observer and observable.Observable are organized into subpackages for namespace-sake to avoid redundant constructor like NewObservable.

import (
	"github.com/reactivex/rxgo"
	"github.com/reactivex/rxgo/observer"
	"github.com/reactivex/rxgo/observable"
	"github.com/reactivex/rxgo/iterable"
	//...
)

Simple Usage

watcher := observer.Observer{

	// Register a handler function for every next available item.
	NextHandler: func(item interface{}) {
		fmt.Printf("Processing: %v\n", item)
	},

	// Register a handler for any emitted error.
	ErrHandler: func(err error) {
		fmt.Printf("Encountered error: %v\n", err)
	},

	// Register a handler when a stream is completed.
	DoneHandler: func() {
		fmt.Println("Done!")
	},
}

it, _ := iterable.New([]interface{}{1, 2, 3, 4, errors.New("bang"), 5})
source := observable.From(it)
sub := source.Subscribe(watcher)

// wait for the channel to emit a Subscription
<-sub

The above will:

  • print the format string for every number in the slice up to 4.
  • print the error "bang"

It is important to remember that only an OnError or OnDone can be called in a stream. If there's an error in the stream, the processing stops and OnDone will never be called, and vice versa.

The concept is to group all side effects into these handlers and let an Observer or any EventHandler to handle them.

package main

import (
	"fmt"

	"github.com/reactivex/rxgo/handlers"
	"github.com/reactivex/rxgo/observable"
	"github.com/reactivex/rxgo/observer"
)

func main() {

	score := 9

	onNext := handlers.NextFunc(func(item interface{}) {
		if num, ok := item.(int); ok {
			score += num
		}
	})

	onDone := handlers.DoneFunc(func() {
		score *= 2
	})

	watcher := observer.New(onNext, onDone)

	// Create an `Observable` from a single item and subscribe to the observer.
	sub := observable.Just(1).Subscribe(watcher)
	<-sub

	fmt.Println(score) // 20
}

FlatMap example:

package main

import (
	"fmt"

	"github.com/reactivex/rxgo/handlers"
	"github.com/reactivex/rxgo/observable"
	"github.com/reactivex/rxgo/observer"
)

func main() {
    primeSequence := observable.Just([]int{2, 3, 5, 7, 11, 13})

    <-primeSequence.
            FlatMap(func(primes interface{}) observable.Observable {
                return observable.Create(func(emitter *observer.Observer) {
                    for _, prime := range primes.([]int) {
                        emitter.OnNext(prime)
                    }
                    emitter.OnDone()
                })
            }, 1).
            Last().
            Subscribe(handlers.NextFunc(func(prime interface{}) {
                fmt.Println("Prime -> ", prime)
            }))
}

Please check out the examples to see how it can be applied to reactive applications.

Recap

An Observable is a synchronous stream of "emitted" values which can be either an empty interface{} or error. Below is how an Observable can be visualized:

                                time -->

(*)-------------(o)--------------(o)---------------(x)----------------|>
 |               |                |                 |                 |
Start          value            value             error              Done

In RxGo, it's useful to think of Observable and Connectable as channels with additional ability to Subscribe handlers. In fact, they are basically channels. When Subscribe method is called on a Observable (or Connect method in case of Connectable), one or more goroutines are spawned to handle asynchronous processing.

Most Observable methods and operators will return the Observable itself, making it chainable.

f1 := func() interface{} {

	// Simulate a blocking I/O
	time.Sleep(2 * time.Second)
	return 1
}

f2 := func() interface{} {

	// Simulate a blocking I/O
	time.Sleep(time.Second)
	return 2
}

onNext := handlers.NextFunc(func(v interface{}) {
	val := encodeVal(v)
	saveToDB(val)
})

wait := observable.Start(f1, f2).Subscribe(onNext)
sub := <-wait

if err := sub.Err(); err != nil {
	saveToLog(err)
}

This is an early project and your contributions will help shape its direction.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EventHandler

type EventHandler interface {
	Handle(interface{})
}

EventHandler type is implemented by all handlers and Observer.

type Iterator

type Iterator interface {
	Next() (interface{}, error)
}

Iterator type is implemented by Iterable.

Directories

Path Synopsis
Package connectable provides a Connectable and its methods.
Package connectable provides a Connectable and its methods.
examples
Package fx provides predicate-like function types to be used with operators such as Map, Filter, Scan, and Start.
Package fx provides predicate-like function types to be used with operators such as Map, Filter, Scan, and Start.
Package handlers provides handler types which implements EventHandler.
Package handlers provides handler types which implements EventHandler.
Package iterable provides an Iterable type that is capable of converting sequences of empty interface such as slice and channel to an Iterator.
Package iterable provides an Iterable type that is capable of converting sequences of empty interface such as slice and channel to an Iterator.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL