Documentation ¶
Overview ¶
Package pipeline contains structures that implement both the Producer and Consumer interfaces. They can be used as extra pipeline components for various utilities.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SanitiseConfig ¶ added in v0.10.4
SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.
Types ¶
type Config ¶ added in v0.10.4
type Config struct { Threads int `json:"threads" yaml:"threads"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is a configuration struct for a pipeline.
type Pool ¶ added in v0.4.6
type Pool struct {
// contains filtered or unexported fields
}
Pool is a pool of pipelines. Each pipeline reads from a shared transaction channel. Inputs remain coupled to their outputs as they propagate the response channel in the transaction.
func NewPool ¶ added in v0.4.6
func NewPool( constructor types.PipelineConstructorFunc, threads int, log log.Modular, stats metrics.Type, ) (*Pool, error)
NewPool returns a new pipeline pool that utilises multiple processor threads.
func (*Pool) CloseAsync ¶ added in v0.4.6
func (p *Pool) CloseAsync()
CloseAsync shuts down the pipeline and stops processing messages.
func (*Pool) Consume ¶ added in v0.19.0
func (p *Pool) Consume(msgs <-chan types.Transaction) error
Consume assigns a messages channel for the pipeline to read.
func (*Pool) TransactionChan ¶ added in v0.9.0
func (p *Pool) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this pipeline.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a pipeline that supports both Consumer and Producer interfaces. The processor will read from a source, perform some processing, and then either propagate a new message or drop it.
func NewProcessor ¶
func NewProcessor( log log.Modular, stats metrics.Type, msgProcessors ...types.Processor, ) *Processor
NewProcessor returns a new message processing pipeline.
func (*Processor) CloseAsync ¶
func (p *Processor) CloseAsync()
CloseAsync shuts down the pipeline and stops processing messages.
func (*Processor) Consume ¶ added in v0.19.0
func (p *Processor) Consume(msgs <-chan types.Transaction) error
Consume assigns a messages channel for the pipeline to read.
func (*Processor) TransactionChan ¶ added in v0.9.0
func (p *Processor) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this pipeline.