Documentation ¶
Overview ¶
Package pipeline contains structures that implement both the Producer and Consumer interfaces. They can be used as extra pipeline components for various utilities.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SanitiseConfig ¶
SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.
Types ¶
type Config ¶
type Config struct { Threads int `json:"threads" yaml:"threads"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is a configuration struct for a pipeline.
type ConstructorFunc ¶
ConstructorFunc is a func for constructing a pipeline type.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a pool of pipelines. Each pipeline reads from a shared transaction channel. Inputs remain coupled to their outputs as they propagate the response channel in the transaction.
func NewPool ¶
func NewPool( constructor ConstructorFunc, threads int, log log.Modular, stats metrics.Type, ) (*Pool, error)
NewPool returns a new pipeline pool that utilises multiple processor threads.
func (*Pool) CloseAsync ¶
func (p *Pool) CloseAsync()
CloseAsync shuts down the pipeline and stops processing messages.
func (*Pool) StartReceiving ¶
func (p *Pool) StartReceiving(msgs <-chan types.Transaction) error
StartReceiving assigns a messages channel for the pipeline to read.
func (*Pool) TransactionChan ¶
func (p *Pool) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this pipeline.
type ProcConstructorFunc ¶
ProcConstructorFunc is a func for constructing a processor type.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a pipeline that supports both Consumer and Producer interfaces. The processor will read from a source, perform some processing, and then either propagate a new message or drop it.
func NewProcessor ¶
func NewProcessor( log log.Modular, stats metrics.Type, msgProcessors ...processor.Type, ) *Processor
NewProcessor returns a new message processing pipeline.
func (*Processor) CloseAsync ¶
func (p *Processor) CloseAsync()
CloseAsync shuts down the pipeline and stops processing messages.
func (*Processor) StartReceiving ¶
func (p *Processor) StartReceiving(msgs <-chan types.Transaction) error
StartReceiving assigns a messages channel for the pipeline to read.
func (*Processor) TransactionChan ¶
func (p *Processor) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this pipeline.