pipeline

package
v0.15.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2018 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package pipeline contains structures that implement both the Producer and Consumer interfaces. They can be used as extra pipeline components for various utilities.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SanitiseConfig

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type Config

type Config struct {
	Threads    int                `json:"threads" yaml:"threads"`
	Processors []processor.Config `json:"processors" yaml:"processors"`
}

Config is a configuration struct for a pipeline.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

type ConstructorFunc

type ConstructorFunc func() (Type, error)

ConstructorFunc is a func for constructing a pipeline type.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a pool of pipelines. Each pipeline reads from a shared transaction channel. Inputs remain coupled to their outputs as they propagate the response channel in the transaction.

func NewPool

func NewPool(
	constructor ConstructorFunc,
	threads int,
	log log.Modular,
	stats metrics.Type,
) (*Pool, error)

NewPool returns a new pipeline pool that utilises multiple processor threads.

func (*Pool) CloseAsync

func (p *Pool) CloseAsync()

CloseAsync shuts down the pipeline and stops processing messages.

func (*Pool) StartReceiving

func (p *Pool) StartReceiving(msgs <-chan types.Transaction) error

StartReceiving assigns a messages channel for the pipeline to read.

func (*Pool) TransactionChan

func (p *Pool) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this pipeline.

func (*Pool) WaitForClose

func (p *Pool) WaitForClose(timeout time.Duration) error

WaitForClose - Blocks until the StackBuffer output has closed down.

type ProcConstructorFunc

type ProcConstructorFunc func() (processor.Type, error)

ProcConstructorFunc is a func for constructing a processor type.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is a pipeline that supports both Consumer and Producer interfaces. The processor will read from a source, perform some processing, and then either propagate a new message or drop it.

func NewProcessor

func NewProcessor(
	log log.Modular,
	stats metrics.Type,
	msgProcessors ...processor.Type,
) *Processor

NewProcessor returns a new message processing pipeline.

func (*Processor) CloseAsync

func (p *Processor) CloseAsync()

CloseAsync shuts down the pipeline and stops processing messages.

func (*Processor) StartReceiving

func (p *Processor) StartReceiving(msgs <-chan types.Transaction) error

StartReceiving assigns a messages channel for the pipeline to read.

func (*Processor) TransactionChan

func (p *Processor) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this pipeline.

func (*Processor) WaitForClose

func (p *Processor) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the StackBuffer output has closed down.

type Type

type Type interface {
	types.Producer
	types.Consumer
	types.Closable
}

Type is an interface that all pipeline types should implement.

func New

func New(
	conf Config,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
	processorCtors ...ProcConstructorFunc,
) (Type, error)

New creates an input type based on an input configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL