Documentation ¶
Overview ¶
Package eventual2go is a library for event driven development. It provides implementations for futures and streams, as can be found in many modern languages.
eventual2go deals with cases, where you are in event-driven enviroment and want to react to something from which you don't now when its gonna happen.
Events can either be unique or not. For example, the result of reading in a large file is unique event, whereas a GET request on a webserver is not. eventual2go provides Futures for the former, Streams for the latter case.
Index ¶
- Variables
- type Actor
- type ActorMessageStream
- type Collector
- func (c *Collector) Add(d Data)
- func (c *Collector) AddFuture(f *Future)
- func (c *Collector) AddFutureError(f *Future)
- func (c *Collector) AddObservable(o *Observable)
- func (c *Collector) AddStream(s *Stream)
- func (c *Collector) Empty() (e bool)
- func (c *Collector) Get() (d Data)
- func (c *Collector) Preview() (d Data)
- func (c *Collector) Size() (n int)
- func (c *Collector) Stop()
- func (c *Collector) Stopped() *Future
- type Completer
- type CompletionFunc
- type CompletionHandler
- type Data
- type DeriveSubscriber
- type ErrorHandler
- type Event
- type Filter
- type Future
- func (f *Future) AsChan() chan Data
- func (f *Future) AsErrChan() chan error
- func (f *Future) Completed() bool
- func (f *Future) Err(eh ErrorHandler) (nf *Future)
- func (f *Future) ErrResult() error
- func (f *Future) Result() Data
- func (f *Future) Then(ch CompletionHandler) (nf *Future)
- func (f *Future) WaitUntilComplete()
- func (f *Future) WaitUntilTimeout(timeout time.Duration) (complete bool)
- type FutureCache
- type FutureWaitGroup
- type LoopActor
- type Observable
- func (o *Observable) AsChan() (c chan Data, cancel *Completer)
- func (o *Observable) Change(value Data)
- func (o *Observable) Derive(t Transformer) (do *Observable, cancel *Completer)
- func (o *Observable) NextChange() (f *Future)
- func (o *Observable) OnChange(subscriber Subscriber) (cancel *Completer)
- func (o *Observable) Stream() (stream *Stream)
- func (o *Observable) Value() (value Data)
- type Reactor
- func (r *Reactor) AddFuture(classifier interface{}, f *Future)
- func (r *Reactor) AddFutureError(classifier interface{}, f *Future)
- func (r *Reactor) AddObservable(classifier interface{}, o *Observable)
- func (r *Reactor) AddStream(classifier interface{}, s *Stream)
- func (r *Reactor) CatchCtrlC()
- func (r *Reactor) CollectEvent(classifier interface{}, c *Collector)
- func (r *Reactor) Fire(classifier interface{}, data Data)
- func (r *Reactor) FireEvery(classifier interface{}, data Data, interval time.Duration)
- func (r *Reactor) FireIn(classifier interface{}, data Data, duration time.Duration)
- func (r *Reactor) OnShutdown(s Subscriber)
- func (r *Reactor) React(classifier interface{}, handler Subscriber)
- func (r *Reactor) Shutdown(d Data) (err error)
- func (r *Reactor) ShutdownFuture() *Future
- type Shutdown
- type ShutdownActor
- type ShutdownEvent
- type Shutdowner
- type Stream
- func (s *Stream) AsChan() (c chan Data, stop *Completer)
- func (s *Stream) Close()
- func (s *Stream) CloseOnFuture(f *Future)
- func (s *Stream) Closed() (f *Future)
- func (s *Stream) Derive(dsr DeriveSubscriber) (ds *Stream)
- func (s *Stream) First() (f *Future)
- func (s *Stream) FirstWhere(f ...Filter) (fw *Future)
- func (s *Stream) FirstWhereNot(f ...Filter) (fw *Future)
- func (s *Stream) Listen(sr Subscriber) (stop *Completer)
- func (s *Stream) ListenNonBlocking(sr Subscriber) (stop *Completer)
- func (s *Stream) Split(f Filter) (ts *Stream, fs *Stream)
- func (s *Stream) Transform(t Transformer) (ts *Stream)
- func (s *Stream) TransformConditional(t TransformerConditional) (ts *Stream)
- func (s *Stream) TransformWhere(t Transformer, f ...Filter) (tws *Stream)
- func (s *Stream) Where(f ...Filter) (fs *Stream)
- func (s *Stream) WhereNot(f ...Filter) (fs *Stream)
- type StreamController
- type Subscriber
- type Transformer
- type TransformerConditional
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrTimeout = errors.New("Timeout")
ErrTimeout represents a timeout error
Functions ¶
This section is empty.
Types ¶
type ActorMessageStream ¶
type ActorMessageStream struct {
// contains filtered or unexported fields
}
ActorMessageStream is used to send messages to an actor.
func SpawnActor ¶
func SpawnActor(a Actor) (messages ActorMessageStream, err error)
SpawnActor creates an actor and returns a message stream to it.
func (ActorMessageStream) Send ¶
func (ams ActorMessageStream) Send(data Data)
Send sends a message to an actor.
func (ActorMessageStream) Shutdown ¶
func (ams ActorMessageStream) Shutdown(data Data) (err error)
Shutdown sends a shutdown signal to the actor. Messages send before the shutdown signal are guaranteed to be handled.
type Collector ¶
type Collector struct {
// contains filtered or unexported fields
}
Collector is a data sink. Use it to collect events for later retrieval. All events are stored in historical order.
func (*Collector) AddFutureError ¶
AddFutureError collects the error of a `Future`
func (*Collector) AddObservable ¶
func (c *Collector) AddObservable(o *Observable)
AddObservable collects all changes off an `Observable`
func (*Collector) Empty ¶
Empty returns true if at least one data element is stored in the collector.
func (*Collector) Preview ¶
Preview retrieves the oldes data from the collecter without deleting it from it.
type Completer ¶
type Completer struct {
// contains filtered or unexported fields
}
Completer is thread-safe struct that can be completed with arbitrary data or failed with an error. Handler functions can be registered for both events and get invoked after completion..
func NewTimeoutCompleter ¶
NewTimeoutCompleter creates a new Completer, which error completes after the specified duration, if Completer hasnt been completed otherwise.
func (*Completer) Complete ¶
Complete completes the Completer with the given data and triggers all registered completion handlers. Panics if the Completer is already complete.
func (*Completer) CompleteError ¶
CompleteError completes the Completer with the given error and triggers all registered error handlers. Panics if the Completer is already complete.
func (*Completer) CompleteOn ¶
func (c *Completer) CompleteOn(f CompletionFunc)
CompleteOn invokes a CompletionFunc in a go-routine and either completes with the resut or the error if it is not nil. Don't invoke this function more then once to avoid multiple complition panics.
func (*Completer) CompleteOnFuture ¶
CompleteOnFuture completes the completer with the result or the error of a `Future`.
type CompletionFunc ¶
A CompletionFunc is the argument for Completer.OnChange.
type CompletionHandler ¶
A CompletionHandler gets invoked when a Future is completed. Returned value gets propagated when chaining futures.
type DeriveSubscriber ¶
type DeriveSubscriber func(*StreamController, Data)
A DeriveSubscriber gets invoked every time data is added on the source stream and is responsible for adding the (transformed) data on the sink stream controller.
type ErrorHandler ¶
An ErrorHandler gets invoked when a Future fails. Returned value and error are propagated when chaining futures, if the error is nil, the chained future will be completed with the data, otherwise it fails.
type Event ¶
type Event struct { Classifier interface{} Data Data }
Event represents a generic classifer assicated with generic data.
type Filter ¶
A Filter gets invoked when data is added to the consumed stream. The data is added to filtered stream conditionally, depending the Filter got registered with Where or WhereNot.
type Future ¶
type Future struct {
// contains filtered or unexported fields
}
Future is thread-safe struct that can be completed with arbitrary data or failed with an error. Handler functions can be registered for both events and get invoked after completion..
Example ¶
Demonstrates the basic usage of futures
package main import ( "errors" "fmt" "time" "github.com/joernweissenborn/eventual2go" ) func main() { // create the completers, one we will complete with error, the other normaly. completerNor := eventual2go.NewCompleter() completerErr := eventual2go.NewCompleter() // set up success handler var onsuccess eventual2go.CompletionHandler = func(d eventual2go.Data) eventual2go.Data { fmt.Println("SUCESS:", d) return "Hello Future Chaining" } // set up error handler var onerror eventual2go.ErrorHandler = func(e error) (eventual2go.Data, error) { fmt.Println("ERROR:", e) return nil, nil } // our long running async func mylongrunning := func(do_err bool, c *eventual2go.Completer) { time.Sleep(1 * time.Second) if do_err { c.CompleteError(errors.New("Hello Future Error")) } else { c.Complete("Hello Future") } } // get the futures fNor := completerNor.Future() fErr := completerErr.Future() // register the handlers // we chain the succes fNor.Then(onsuccess).Then(onsuccess) fNor.Err(onerror) fErr.Then(onsuccess) fErr.Err(onerror) // execute the functions go mylongrunning(false, completerNor) go mylongrunning(true, completerErr) // wait for futures to complete fNor.WaitUntilComplete() fErr.WaitUntilComplete() // everything is async, so the future is maybe complete, but the handlers must not have been executed necessarily, so we wait 10 ms time.Sleep(10 * time.Millisecond) }
Output:
func (*Future) AsChan ¶
AsChan returns a channel which either will receive the result on completion or gets closed on error completion of the Future.
func (*Future) AsErrChan ¶
AsErrChan returns a channel which either will receive the error on error completion or gets closed on completion of the Future.
func (*Future) Err ¶
func (f *Future) Err(eh ErrorHandler) (nf *Future)
Err registers an error handler. If the future is already completed with an error, the handler gets executed immediately. Returns a future that either gets completed with result of the handler or error completed with the error from handler, if not nil.
func (*Future) ErrResult ¶
ErrResult returns the resulting error of the future, nil if called before completion or after non-error completion.
func (*Future) Result ¶
Result returns the result of the future, nil if called before completion or after error completion.
func (*Future) Then ¶
func (f *Future) Then(ch CompletionHandler) (nf *Future)
Then registers a completion handler. If the future is already complete, the handler gets executed immediately. Returns a future that gets completed with result of the handler.
func (*Future) WaitUntilComplete ¶
func (f *Future) WaitUntilComplete()
WaitUntilComplete blocks until the future is complete.
type FutureCache ¶
type FutureCache struct {
// contains filtered or unexported fields
}
FutureCache is a thread-safe cache for storing futures. It stores data with a userdefined index. Useful e.g. when needing to retrieve the same data for multiple requests from a slow location. The cache is sized and implemented as a ring buffer.
func NewCache ¶
func NewCache(size int) (fc *FutureCache)
NewCache creates a new FutureCache of the given size
func (*FutureCache) Cache ¶
func (fc *FutureCache) Cache(Index int, f *Future)
Cache stores a future with given index.
func (*FutureCache) Cached ¶
func (fc *FutureCache) Cached(index int) (is bool)
Cached indicates if there has already been a Future cached at given index.
func (*FutureCache) Get ¶
func (fc *FutureCache) Get(index int) (f *Future)
Get retrives the future with a given index.
type FutureWaitGroup ¶
type FutureWaitGroup struct {
// contains filtered or unexported fields
}
func NewFutureWaitGroup ¶
func NewFutureWaitGroup() (fwg *FutureWaitGroup)
func (*FutureWaitGroup) Add ¶
func (fwg *FutureWaitGroup) Add(f *Future)
func (*FutureWaitGroup) Wait ¶
func (fwg *FutureWaitGroup) Wait()
type LoopActor ¶
LoopActor is an actor with a loop method which is called repeatedly. Messages are handled in between loop repetitions.
type Observable ¶
type Observable struct {
// contains filtered or unexported fields
}
Observable is represents a value, which can be updated in a threadsafe and change order preserving manner.
Subscribers can get informed by changes through either callbacks or channels.
func NewObservable ¶
func NewObservable(initial Data) (o *Observable)
NewObservable creates a new Observable with an initial value.
func (*Observable) AsChan ¶
func (o *Observable) AsChan() (c chan Data, cancel *Completer)
AsChan returns a channel on which changes get send.
func (*Observable) Change ¶
func (o *Observable) Change(value Data)
Change changes the value of the observable
func (*Observable) Derive ¶
func (o *Observable) Derive(t Transformer) (do *Observable, cancel *Completer)
Derive returns a new Observable which value will be set by transform function everytime the source gets updated.
func (*Observable) NextChange ¶
func (o *Observable) NextChange() (f *Future)
NextChange returns a Future which gets completed with the next change.
func (*Observable) OnChange ¶
func (o *Observable) OnChange(subscriber Subscriber) (cancel *Completer)
OnChange registers a subscriber for change events.
func (*Observable) Stream ¶
func (o *Observable) Stream() (stream *Stream)
Stream returns a stream of change events.
func (*Observable) Value ¶
func (o *Observable) Value() (value Data)
Value returns the current value of the observable. Threadsafe.
type Reactor ¶
Reactor is thread-safe event handler.
func (*Reactor) AddFuture ¶
AddFuture creates an event with given classifier, which will be fired when the given future completes. The event will not be triggered on error comletion.
func (*Reactor) AddFutureError ¶
AddFutureError acts the same as AddFuture, but registers a handler for the future error.
func (*Reactor) AddObservable ¶
func (r *Reactor) AddObservable(classifier interface{}, o *Observable)
AddObservable fires an event with the given classifier whenever the observable is changed.
func (*Reactor) AddStream ¶
AddStream subscribes to a Stream, firing an event with the given classifier for every new element in the stream.
func (*Reactor) CatchCtrlC ¶
func (r *Reactor) CatchCtrlC()
CatchCtrlC starts a goroutine, which initializes reactor shutdown when os.Interrupt is received.
func (*Reactor) CollectEvent ¶
CollectEvent register the given Collectors Add method as eventhandler for the given classifier.
func (*Reactor) Fire ¶
Fire triggers an event, invoking asynchronly the registered subscriber, if any. Events are guaranteed to be handled in the order of arrival.
func (*Reactor) FireEvery ¶
FireEvery fires the given event repeatedly. FireEvery can not be canceled and will run until the reactor is shut down.
func (*Reactor) FireIn ¶
FireIn fires the given event after the given duration. FireIn can not be canceled.
func (*Reactor) OnShutdown ¶
func (r *Reactor) OnShutdown(s Subscriber)
OnShutdown registers a custom handler for the shutdown event.
func (*Reactor) React ¶
func (r *Reactor) React(classifier interface{}, handler Subscriber)
React registers a Subscriber as handler for a given event classier. Previously registered handlers for for the given classifier will be overwritten!
func (*Reactor) Shutdown ¶
Shutdown shuts down the reactor, cancelling all go routines and stream subscriptions. The error is to fullfill the `Shutdowner` interface and will always be nil.
func (*Reactor) ShutdownFuture ¶
ShutdownFuture returns a future which gets completed after the reactor shut down.
type Shutdown ¶
type Shutdown struct {
// contains filtered or unexported fields
}
Shutdown is a register for `Shutdowner` and is used to orchestrate a concurrent shutdown.
func (*Shutdown) Do ¶
Do intiatates the shutdown by concurrently calling the shutdown method on all registered `Shutdowner`. Blocks until all shutdowns have finished.
func (*Shutdown) Register ¶
func (sd *Shutdown) Register(s Shutdowner)
Register registers a `Shutdowner`.
type ShutdownActor ¶
ShutdownActor is an actor with a Shutdown method, which is called upon actor shutdown.
type ShutdownEvent ¶
type ShutdownEvent struct{}
ShutdownEvent is triggering the reactor Shutdown when fired.
type Shutdowner ¶
Shutdowner represents the eventual2go shutdown interface.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
A Stream can be consumed or new streams be derived by registering handler functions.
func (*Stream) AsChan ¶
AsChan returns a channel where all items will be pushed. Note items while be queued in a fifo since the stream must not block.
func (*Stream) Close ¶
func (s *Stream) Close()
Close closes the Stream and its assigned StreamController.
func (*Stream) CloseOnFuture ¶
CloseOnFuture closes the Stream upon completion of Future.
func (*Stream) Derive ¶
func (s *Stream) Derive(dsr DeriveSubscriber) (ds *Stream)
Derive creates a derived stream from a DeriveSubscriber. Mainly used internally.
func (*Stream) First ¶
First returns a future that will be completed with the first element added to the stream.
func (*Stream) FirstWhere ¶
FirstWhere returns a future that will be completed with the first element added to the stream where filter returns TRUE.
func (*Stream) FirstWhereNot ¶
FirstWhereNot returns a future that will be completed with the first element added to the stream where filter returns FALSE.
func (*Stream) Listen ¶
func (s *Stream) Listen(sr Subscriber) (stop *Completer)
Listen registers a subscriber. Returns a Completer, which can be used to terminate the subcription.
func (*Stream) ListenNonBlocking ¶
func (s *Stream) ListenNonBlocking(sr Subscriber) (stop *Completer)
ListenNonBlocking is the same as Listen, but the subscriber is not blocking the subcription.
func (*Stream) Split ¶
Split returns a stream with all elements where the filter returns TRUE and one where the filter returns FALSE.
func (*Stream) Transform ¶
func (s *Stream) Transform(t Transformer) (ts *Stream)
Transform registers a Transformer function and returns the transformed stream.
func (*Stream) TransformConditional ¶
func (s *Stream) TransformConditional(t TransformerConditional) (ts *Stream)
TransformConditional registers a TransformerConditional function and returns the transformed stream.
func (*Stream) TransformWhere ¶
func (s *Stream) TransformWhere(t Transformer, f ...Filter) (tws *Stream)
TransformWhere transforms only filtered elements.
type StreamController ¶
type StreamController struct {
// contains filtered or unexported fields
}
A StreamController is Stream where elements can be added manually or other Streams joined in.
func NewStreamController ¶
func NewStreamController() (sc *StreamController)
NewStreamController creates a new StreamController.
func (*StreamController) Add ¶
func (sc *StreamController) Add(d Data)
Add adds an element to the stream.
func (*StreamController) Join ¶
func (sc *StreamController) Join(source *Stream)
Join joins a stream. All elements from the source will be added to the stream
func (*StreamController) JoinFuture ¶
func (sc *StreamController) JoinFuture(f *Future)
JoinFuture joins a future completion event. The result will be added to the stream.
func (*StreamController) Stream ¶
func (sc *StreamController) Stream() *Stream
Stream return the underlying stream.
type Subscriber ¶
type Subscriber func(Data)
A Subscriber gets invoked whenever data is added to the consumed stream.
type Transformer ¶
A Transformer gets invoked when data is added to the consumed stream. The output gets added to the transformed stream.
type TransformerConditional ¶
A TransformerConditional is like a Transformer, but can filter the data.