Documentation ¶
Overview ¶
Package pipeline contains structures that implement both the Producer and Consumer interfaces. They can be used as extra pipeline components for various utilities.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConstructorFunc ¶ added in v0.4.6
ConstructorFunc is a common type for constructing a pipeline type.
type Pool ¶ added in v0.4.6
type Pool struct {
// contains filtered or unexported fields
}
Pool is a pool of pipelines. It reads from a single source and writes to a single source. The input is decoupled which means failed delivery notification cannot be propagated back up to the original input.
If delivery acknowledgements to the input is required this pool should not be used. Instead, you should configure multiple inputs each with their own pipeline e.g. configure 8 kafka_balanced inputs each with a single processor rather than a single kafka_balanced with a pool of 8 workers.
func NewPool ¶ added in v0.4.6
func NewPool( constructor ConstructorFunc, workers int, log log.Modular, stats metrics.Type, ) (*Pool, error)
NewPool returns a new pipeline pool that utilized multiple processor threads.
func (*Pool) CloseAsync ¶ added in v0.4.6
func (p *Pool) CloseAsync()
CloseAsync shuts down the pipeline and stops processing messages.
func (*Pool) StartReceiving ¶ added in v0.4.6
func (p *Pool) StartReceiving(msgs <-chan types.Transaction) error
StartReceiving assigns a messages channel for the pipeline to read.
func (*Pool) TransactionChan ¶ added in v0.9.0
func (p *Pool) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this pipeline.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a pipeline that supports both Consumer and Producer interfaces. The processor will read from a source, perform some processing, and then either propagate a new message or drop it.
func NewProcessor ¶
func NewProcessor( log log.Modular, stats metrics.Type, msgProcessors ...processor.Type, ) *Processor
NewProcessor returns a new message processing pipeline.
func (*Processor) CloseAsync ¶
func (p *Processor) CloseAsync()
CloseAsync shuts down the pipeline and stops processing messages.
func (*Processor) StartReceiving ¶
func (p *Processor) StartReceiving(msgs <-chan types.Transaction) error
StartReceiving assigns a messages channel for the pipeline to read.
func (*Processor) TransactionChan ¶ added in v0.9.0
func (p *Processor) TransactionChan() <-chan types.Transaction
TransactionChan returns the channel used for consuming messages from this pipeline.