Documentation ¶
Overview ¶
Package pipeline implements the basic data processing pipeline used by peco
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Acceptor ¶
type Acceptor interface {
Accept(context.Context, chan interface{}, ChanOutput)
}
Acceptor is an object that can accept input, and send to an optional output
type ChanOutput ¶ added in v0.4.7
type ChanOutput chan interface{}
ChanOutput is an alias to `chan interface{}`
func NilOutput ¶ added in v0.4.7
func NilOutput(ctx context.Context) ChanOutput
func (ChanOutput) OutCh ¶ added in v0.4.7
func (oc ChanOutput) OutCh() <-chan interface{}
OutCh returns the channel that acceptors can listen to
func (ChanOutput) Send ¶ added in v0.4.7
func (oc ChanOutput) Send(v interface{}) (err error)
Send sends the data `v` through this channel
func (ChanOutput) SendEndMark ¶ added in v0.4.7
func (oc ChanOutput) SendEndMark(s string) error
SendEndMark sends an end mark
type Destination ¶
type Destination interface { Reset() Done() <-chan struct{} Acceptor }
Destination is a special case Acceptor that has no more Acceptors chained to it to consume data
type EndMark ¶
type EndMark struct{}
EndMark is a dummy struct that gets send as an EOL mark of sorts
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline is encapsulates a chain of `Source`, `ProcNode`s, and `Destination`
func (*Pipeline) Add ¶
Add adds new Acceptor that work on data that goes through the Pipeline. If called during `Run`, this method will block.
func (*Pipeline) Run ¶
Run starts the processing. Mutator methods for `Pipeline` cannot be called while `Run` is running.
func (*Pipeline) SetDestination ¶
func (p *Pipeline) SetDestination(d Destination)
SetDestination sets the destination. If called during `Run`, this method will block.
type Source ¶
type Source interface { // Start should be able to be called repeatedly, producing the // same data to be consumed by the chained Acceptors Start(context.Context, ChanOutput) Reset() }