Documentation ¶
Overview ¶
Package pipeline contains structures that implement both the Producer and Consumer interfaces. They can be used as extra pipeline components for various utilities.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SanitiseConfig ¶
SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.
Types ¶
type Config ¶
type Config struct { Threads int `json:"threads" yaml:"threads"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is a configuration struct for creating parallel processing pipelines. The number of resuling parallel processing pipelines will match the number of threads specified. Processors are executed on each message in the order that they are written.
In order to fully utilise each processing thread you must either have a number of parallel inputs that matches or surpasses the number of pipeline threads, or use a memory buffer.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a pool of pipelines. Each pipeline reads from a shared transaction channel. Inputs remain coupled to their outputs as they propagate the response channel in the transaction.
func NewPool ¶
func NewPool( constructor types.PipelineConstructorFunc, threads int, log log.Modular, stats metrics.Type, ) (*Pool, error)
NewPool returns a new pipeline pool that utilises multiple processor threads. TODO: V4 Remove this
func (*Pool) CloseAsync ¶
func (p *Pool) CloseAsync()
CloseAsync shuts down the pipeline and stops processing messages.
func (*Pool) Consume ¶
func (p *Pool) Consume(msgs <-chan types.Transaction) error
Consume assigns a messages channel for the pipeline to read.
func (*Pool) TransactionChan ¶
func (p *Pool) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this pipeline.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a pipeline that supports both Consumer and Producer interfaces. The processor will read from a source, perform some processing, and then either propagate a new message or drop it.
func NewProcessor ¶
func NewProcessor( log log.Modular, stats metrics.Type, msgProcessors ...types.Processor, ) *Processor
NewProcessor returns a new message processing pipeline.
func (*Processor) CloseAsync ¶
func (p *Processor) CloseAsync()
CloseAsync shuts down the pipeline and stops processing messages.
func (*Processor) Consume ¶
func (p *Processor) Consume(msgs <-chan types.Transaction) error
Consume assigns a messages channel for the pipeline to read.
func (*Processor) TransactionChan ¶
func (p *Processor) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this pipeline.