pipe

package
v0.0.0-...-a9562c4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2014 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ERROR_HANDLER_WAIT_TIMEOUT = errors.New("Handler wait timed out")

Functions

func Bookkeeper

func Bookkeeper(incOutCh chan Event) (chan Event, chan Event)

Bookkeeper accepts full and incremental updates on his returned channels and sends redundant-free incremental events out on the incOutCh channel. It enables watchers which only get a full list of backends to send them without further bookkeeping. Full updates on fullInCh have to be a single Event with multiple nodes, only accepting EventNodeUp types.

func BookkeeperReceiver

func BookkeeperReceiver(incInCh chan Event) chan Event

BookkeeperReceiver reads an incremental event channel incInCh and returns a channel with full updates send on request. If incInCh gets closed, the output channel is closed.

func Broadcaster

func Broadcaster(inCh chan Event, outChs []chan Event)

func Merger

func Merger(inCh chan Event, outCh chan Event)

EventMerger merges incoming events from inCh if sink could not keep up

Types

type Book

type Book struct {
	// contains filtered or unexported fields
}

Book is a datastructure needed for bookkeeping of node data. It receives incremental and full updates on the list of running nodes by events and tracks a list of all nodes currently up. Therefore it's capable of filtering redundant events and only returns necessary incremental updates. Is thread-safe.

func NewBook

func NewBook() *Book

Creates a new empty book

func (*Book) Full

func (b *Book) Full() Event

Full returns a event containing all nodes currently up. If there are no nodes an event without nodes is returned.

func (*Book) UpdateFull

func (b *Book) UpdateFull(ev Event) Event

UpdateFull updates the book with the list of nodes inside the event. The event represents a full update, so only EventNodeUp is allowed. Missing nodes are marked as EventNodeDown and removed from book.

func (*Book) UpdateInc

func (b *Book) UpdateInc(ev Event) Event

UpdateInc updates the book with an incremental event ev and returns an event if the update changed the book. If the update contains a node with status EventNodeDown but was never seen by the book, the node is ignored. Incremental updates only work if book receives events from start up and never miss an event.

type Endpoint

type Endpoint interface {
	// Started in seperate go routine
	Handle(eventCh chan Event, closeCh chan struct{})
}

type EndpointFunc

type EndpointFunc func(eventCh chan Event, closeCh chan struct{})

func (EndpointFunc) Handle

func (f EndpointFunc) Handle(eventCh chan Event, closeCh chan struct{})

type Event

type Event map[string]NodeInfo

func NewEvent

func NewEvent() Event

func NewEventWithNode

func NewEventWithNode(name string, status NodeStatus, host string, port uint16) Event

func (Event) AddNewNode

func (e Event) AddNewNode(name string, status NodeStatus, host string, port uint16)

func (Event) AddNode

func (e Event) AddNode(node NodeInfo)

func (Event) String

func (e Event) String() string

func (Event) Update

func (e Event) Update(newer Event)

type Forwarder

type Forwarder struct {
	// contains filtered or unexported fields
}

func NewForwarder

func NewForwarder(outCh chan Event) *Forwarder

NewForwarder creates a new forwarder for given output channel.

func (*Forwarder) Forward

func (f *Forwarder) Forward(inCh chan Event)

Forwards the given inChannel to output channel.

func (*Forwarder) WaitClose

func (f *Forwarder) WaitClose()

WaitClose waits until all forwarded input channels are closed and closes output channel. Does not block.

type ManagedEndpoint

type ManagedEndpoint struct {
	Endpoint Endpoint
	DoneCh   chan struct{}
	CloseCh  chan struct{}
	// contains filtered or unexported fields
}

ManagedHandler wraps a Handler to control its shutdown behaviour.

func NewManagedEndpoint

func NewManagedEndpoint(handle Endpoint) *ManagedEndpoint

NewManagedHandler creates a wrapper around an handler which manages the shutdown behaviour.

func (*ManagedEndpoint) Handle

func (m *ManagedEndpoint) Handle(eventCh chan Event)

Handle starts the wrapped handler on the given eventChannel and closes the DoneCh if handler returns. Blocks until handler returns.

func (*ManagedEndpoint) Stop

func (m *ManagedEndpoint) Stop()

Stop signals the handler to exit by using its close channel, stop will not close the event channel.

func (*ManagedEndpoint) Wait

func (m *ManagedEndpoint) Wait()

Wait waits for the handler to stop.

func (*ManagedEndpoint) WaitTimeout

func (m *ManagedEndpoint) WaitTimeout(timeout time.Duration) error

WaitTimeout waits for the handler to stop or timeout to occur.

type NodeInfo

type NodeInfo struct {
	Name   string
	Status NodeStatus
	Host   string
	Port   uint16
}

func NewNodeInfo

func NewNodeInfo(name string, status NodeStatus, host string, port uint16) NodeInfo

func (NodeInfo) String

func (n NodeInfo) String() string

type NodeStatus

type NodeStatus int
const (
	NodeUp NodeStatus = iota + 1 // Node is up
	NodeDown
)

func (NodeStatus) String

func (s NodeStatus) String() string

type Reactor

type Reactor interface {

	// Setup configures the reactor with a global config
	Setup(cfg json.RawMessage) error

	// Accept configures an instance of the reactor dedicated to a service.
	// Returns an Endpoint ready to communicate on the pipeline.
	Accept(cfg json.RawMessage) (Endpoint, error)
}

Reactor is the sink of events, it "reacts" on incoming events.

type Watcher

type Watcher interface {

	// Setup configures the watcher with a global config
	Setup(cfg json.RawMessage) error

	// Accept configures an instance of the watcher dedicated to a service.
	// Returns an Endpoint ready to communicate on the pipeline.
	Accept(cfg json.RawMessage) (Endpoint, error)
}

Watcher is the source of events, it "watches" a system and generates events which are then send down the pipeline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL