pipe

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2017 License: BSD-3-Clause Imports: 4 Imported by: 0

Documentation

Overview

Package pipe provides types to help manage transporter communication channels as well as event types.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Mock

type Mock struct {
	MsgCount int
}

Mock serves as a pipe used to count the number of messages sent to writeMessage.

func (*Mock) WriteMessage

func (m *Mock) WriteMessage(msg message.Msg) (message.Msg, error)

WriteMessage increments the message counter.

type Pipe

type Pipe struct {
	In      messageChan
	Out     []messageChan
	Err     chan error
	Event   chan events.Event
	Stopped bool // has the pipe been stopped?

	MessageCount int
	LastMsg      message.Msg
	ExtraState   map[string]interface{}
	// contains filtered or unexported fields
}

Pipe provides a set of methods to let transporter nodes communicate with each other.

Pipes contain In, Out, Err, and Event channels. Messages are consumed by a node through the 'in' chan, emitted from the node by the 'out' chan. Pipes come in three flavours, a sourcePipe, which only emits messages and has no listening loop, a sinkPipe which has a listening loop, but doesn't emit any messages, and joinPipe which has a li tening loop that also emits messages.

func NewPipe

func NewPipe(pipe *Pipe, path string) *Pipe

NewPipe creates a new Pipe. If the pipe that is passed in is nil, then this pipe will be treaded as a source pipe that just serves to emit messages. Otherwise, the pipe returned will be created and chained from the last member of the Out slice of the parent. This function has side effects, and will add an Out channel to the pipe that is passed in

func (*Pipe) Listen

func (m *Pipe) Listen(fn func(message.Msg) (message.Msg, error), nsFilter *regexp.Regexp) error

Listen starts a listening loop that pulls messages from the In chan, applies fn(msg), a `func(message.Msg) error`, and emits them on the Out channel. Errors will be emitted to the Pipe's Err chan, and will terminate the loop. The listening loop can be interrupted by calls to Stop().

func (*Pipe) Send

func (m *Pipe) Send(msg message.Msg)

Send emits the given message on the 'Out' channel. the send Timesout after 100 ms in order to chaeck of the Pipe has stopped and we've been asked to exit. If the Pipe has been stopped, the send will fail and there is no guarantee of either success or failure

func (*Pipe) Stop

func (m *Pipe) Stop()

Stop terminates the channels listening loop, and allows any timeouts in send to fail

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL