pipeline

package
v0.0.0-...-e92691c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2019 License: MIT Imports: 5 Imported by: 0

README

Pipeline

Simple API that allows easy usage and control of multiple channels between processes in a pipeline

Structs:

  • Pipeline - Responsible for managing all channels and processes
  • Process - Responsible for processing arbitrary data and passing data to the next section of the pipeline

Pipeline API:

  • New *Pipeline

Initializes a new pipeline

  • Side Effects:
  • Creates a nil process to use as the head of the pipeline
  • Starts a goroutine which waits for an error to occur
  • NOTE: This is the only way a pipeline should be initialized
  • Start

Uses the initialized nil process to send data to the first, user created, process

  • Side Effects:
  • Attaches a "closer" channel to the tail process(es), this only happens on the first call of start
  • Wait error

Wait closes the initialized nil process and waits for all tail processes to send the closer signal. Once all procs have closed, we send back an error, if one occurred

  • NOTE: Start will panic if called after wait
  • WaitWithTimeout

This is the same as wait but with an added timeout

  • Abort

Abort sends an error to the pipelines abort channel. This causes the pipeline to prematurely close all running processes and returns the error from the Wait func

  • NOTE: Access to this function is provided to all the process functions

Creating a custom pipeline:

  • Append error

This function will append a custom process to the pipelines internal process slice This provides a simple 1 to 1 connection between two processes

  • FanOut error

This function will append N custom processes to the previous process in the pipeline This provides a 1 to N connection between multiple processes

  • FanIn error

This function will append 1 process to the previous N processes in the pipeline This provides an N to 1 connection between multiple processes

  • ConnectNtoM error

This function will append M processes to the previous N processes in the pipeline This, as its name says, provides an M to N connection between multiple processes

Creating a custom process:

  • NewProcess *Process

Initializes and returns a custom process with the appropriate Process function All you need to create a custom process is to wrap your function with a process function A Process function follows the form: func(v interface{}, send func(interface)bool, abort func(error)) v is the data that is passed from process to process send contains safe access the processes send channel, returns true if the channel has been closed abort contains safe access to the pipelines abort channel

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Connect

func Connect(s Sender, r Receiver) error

Connect bridges two processes via channels

Types

type ContextFn

type ContextFn func(interface{}, func(interface{}) bool, func(error))

ContextFn wraps the users function and provides the necessary controls to alert the pipeline of completion and errors

type Pipeline

type Pipeline struct {
	Procs []Processor
	// contains filtered or unexported fields
}

Pipeline contains a group of chained processes and waits for completion or an error to occur

*
* Error Handling:
*  There is an error channel wrapped in a func called Abort that all processes can access.
*  Calling the Abort fn causes the Pipeline to Close all running processes, decrement the wg counter
*  and then return the error
*
* Concurrency Patterns Implemented:
*  Fan In: Using the fan in method allows the user to connect one receiver channel to N sender channels
*  Fan Out: Using the fan out method allows the user to connect one send channel to N receiver channels
*  Append: Simply connects one sender to one receiver.
*  Connect N to M: Create method to bridge N senders to N receivers
*

func New

func New() *Pipeline

New returns a newly initialized pipeline

func (*Pipeline) Abort

func (p *Pipeline) Abort(err error)

Abort sends an error over the abort channel, which will cause a shutdown

func (*Pipeline) Chain

func (p *Pipeline) Chain(fn ContextFn) error

Chain appends a process to our current chain of processes

func (*Pipeline) ConnectNtoM

func (p *Pipeline) ConnectNtoM(n, m int, fn ContextFn) error

ConnectNtoM ...

func (*Pipeline) FanIn

func (p *Pipeline) FanIn(n int, fn ContextFn) error

FanIn connects process to the last n procs

func (*Pipeline) FanOut

func (p *Pipeline) FanOut(fn ContextFn, n int) error

FanOut connects procs to the last process

func (*Pipeline) Start

func (p *Pipeline) Start(v interface{})

Start sends data to the first process in the pipeline

func (*Pipeline) Wait

func (p *Pipeline) Wait() error

Wait starts signaling to close channels after all items have been sent, waits for response from closer

func (*Pipeline) WaitWithTimeout

func (p *Pipeline) WaitWithTimeout(dur time.Duration) error

WaitWithTimeout starts signaling to close channels after all items have been sent, waits for response from closer or timesout

type Process

type Process struct {
	Run ContextFn
	// contains filtered or unexported fields
}

Process waits for work, processes the work via the ContextFn and finally sends the work to a subsequent process

func (*Process) Abort

func (p *Process) Abort(err error)

Abort calls the parent abort func

func (*Process) Close

func (p *Process) Close()

Close closes the underlying send channel

func (*Process) Closer

func (p *Process) Closer(c chan bool)

Closer adds sets closer channel

func (*Process) In

func (p *Process) In(in chan interface{}) error

In sets the receiver channel

func (*Process) Out

func (p *Process) Out() chan interface{}

Out returns sender channel

func (*Process) Receive

func (p *Process) Receive()

Receive accepts data and processes it

func (*Process) Send

func (p *Process) Send(v interface{}) (open bool)

Send moves data to the next process in the pipeline

type Processor

type Processor interface {
	Sender
	Receiver
	Closer(chan bool)
	Close()
}

Processor accepts data, does whatever, and sends data out

type Receiver

type Receiver interface {
	Receive()
	In(chan interface{}) error
}

Receiver implements a receive channel

type Sender

type Sender interface {
	Send(interface{}) bool
	Out() chan interface{}
}

Sender implements a send channel

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL