pipeline

package
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2017 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package pipeline implements the basic data processing pipeline used by peco

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsEndMark

func IsEndMark(err error) bool

IsEndMark is an utility function that checks if the given error object is an EndMark

Types

type Acceptor

type Acceptor interface {
	Accept(context.Context, chan interface{}, ChanOutput)
}

Acceptor is an object that can accept input, and send to an optional output

type ChanOutput added in v0.4.7

type ChanOutput chan interface{}

ChanOutput is an alias to `chan interface{}`

func NilOutput added in v0.4.7

func NilOutput(ctx context.Context) ChanOutput

func (ChanOutput) OutCh added in v0.4.7

func (oc ChanOutput) OutCh() <-chan interface{}

OutCh returns the channel that acceptors can listen to

func (ChanOutput) Send added in v0.4.7

func (oc ChanOutput) Send(v interface{}) (err error)

Send sends the data `v` through this channel

func (ChanOutput) SendEndMark added in v0.4.7

func (oc ChanOutput) SendEndMark(s string) error

SendEndMark sends an end mark

type Destination

type Destination interface {
	Reset()
	Done() <-chan struct{}
	Acceptor
}

Destination is a special case Acceptor that has no more Acceptors chained to it to consume data

type EndMark

type EndMark struct{}

EndMark is a dummy struct that gets send as an EOL mark of sorts

func (EndMark) EndMark

func (e EndMark) EndMark() bool

EndMark returns true

func (EndMark) Error

func (e EndMark) Error() string

Error returns the error string "end of input"

type EndMarker

type EndMarker interface {
	error
	EndMark() bool
}

EndMarker is an interface for things that tell us the input sequence has ended

type Output added in v0.4.7

type Output interface {
	Send(interface{}) error
}

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline is encapsulates a chain of `Source`, `ProcNode`s, and `Destination`

func New

func New() *Pipeline

New creates a new Pipeline

func (*Pipeline) Add

func (p *Pipeline) Add(n Acceptor)

Add adds new Acceptor that work on data that goes through the Pipeline. If called during `Run`, this method will block.

func (*Pipeline) Done

func (p *Pipeline) Done() <-chan struct{}

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context) (err error)

Run starts the processing. Mutator methods for `Pipeline` cannot be called while `Run` is running.

func (*Pipeline) SetDestination

func (p *Pipeline) SetDestination(d Destination)

SetDestination sets the destination. If called during `Run`, this method will block.

func (*Pipeline) SetSource

func (p *Pipeline) SetSource(s Source)

SetSource sets the source. If called during `Run`, this method will block.

type Source

type Source interface {
	// Start should be able to be called repeatedly, producing the
	// same data to be consumed by the chained Acceptors
	Start(context.Context, ChanOutput)

	Reset()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL