pipeline

package
v4.43.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package pipeline contains structures that implement both the Producer and Consumer interfaces. They can be used as extra pipeline components for various utilities.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConfigSpec

func ConfigSpec() docs.FieldSpec

ConfigSpec returns a configuration spec for a processor pipeline.

func New

New creates a processor pipeline based on a processor pipeline configuration.

Types

type Config

type Config struct {
	Threads    int                `json:"threads" yaml:"threads"`
	Processors []processor.Config `json:"processors" yaml:"processors"`
}

Config is a configuration struct for creating parallel processing pipelines. The number of resuling parallel processing pipelines will match the number of threads specified. Processors are executed on each message in the order that they are written.

In order to fully utilise each processing thread you must either have a number of parallel inputs that matches or surpasses the number of pipeline threads, or use a memory buffer.

func FromAny

func FromAny(prov docs.Provider, value any) (conf Config, err error)

FromAny returns a pipeline config from a parsed config, yaml node or map.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a pool of pipelines. Each pipeline reads from a shared transaction channel. Inputs remain coupled to their outputs as they propagate the response channel in the transaction.

func NewPool

func NewPool(threads int, log log.Modular, msgProcessors ...processor.V1) (*Pool, error)

NewPool creates a new processing pool.

func (*Pool) Consume

func (p *Pool) Consume(msgs <-chan message.Transaction) error

Consume assigns a messages channel for the pipeline to read.

func (*Pool) TransactionChan

func (p *Pool) TransactionChan() <-chan message.Transaction

TransactionChan returns the channel used for consuming messages from this pipeline.

func (*Pool) TriggerCloseNow

func (p *Pool) TriggerCloseNow()

TriggerCloseNow signals that the component should close immediately, messages in flight will be dropped.

func (*Pool) WaitForClose

func (p *Pool) WaitForClose(ctx context.Context) error

WaitForClose blocks until the component has closed down or the context is cancelled. Closing occurs either when the input transaction channel is closed and messages are flushed (and acked), or when CloseNowAsync is called.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is a pipeline that supports both Consumer and Producer interfaces. The processor will read from a source, perform some processing, and then either propagate a new message or drop it.

func NewProcessor

func NewProcessor(msgProcessors ...processor.V1) *Processor

NewProcessor returns a new message processing pipeline.

func (*Processor) Consume

func (p *Processor) Consume(msgs <-chan message.Transaction) error

Consume assigns a messages channel for the pipeline to read.

func (*Processor) TransactionChan

func (p *Processor) TransactionChan() <-chan message.Transaction

TransactionChan returns the channel used for consuming messages from this pipeline.

func (*Processor) TriggerCloseNow

func (p *Processor) TriggerCloseNow()

TriggerCloseNow signals that the processor pipeline should close immediately.

func (*Processor) WaitForClose

func (p *Processor) WaitForClose(ctx context.Context) error

WaitForClose blocks until the component has closed down or the context is cancelled. Closing occurs either when the input transaction channel is closed and messages are flushed (and acked), or when CloseNowAsync is called.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL