pipeline

package
v0.2.1-alpha.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2024 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildPipelineLog added in v0.3.0

func BuildPipelineLog(status Status, workerID string, pipelineName string, stageName string, msgId string, customMsg string) string

Types

type Config

type Config struct {
	Pipeline   *Pipeline
	InputChan  chan Message
	OutputChan chan Message
	NumWorkers int
}

Config holds the configuration for each pipeline, including its input and output channels, and the number of workers.

type ImmutablePipeMessage added in v0.3.0

type ImmutablePipeMessage PipeMessage

A ImmutablePipeMessage is a wrapper for a payload (consisting of a body and attributes) and internal message attributes. An instance of PipelineMessage can be routed through a pipeline.

func NewImmutablePipeMessage added in v0.3.0

func NewImmutablePipeMessage(messageId string, payloadBody []byte, msgAttrs map[string]string) ImmutablePipeMessage

NewImmutablePipeMessage creates a new ImmutablePipeMessage.

func (ImmutablePipeMessage) AddMessageAttribute added in v0.3.0

func (m ImmutablePipeMessage) AddMessageAttribute(key string, value string) (ImmutablePipeMessage, error)

AddMessageAttribute adds an internal message attribute to a copy of the original message to keep the original instance unchanged. It will throw an error if the attribute key already exists.

func (ImmutablePipeMessage) Copy added in v0.3.0

Copy copies a message and its contents.

func (ImmutablePipeMessage) GetMessageAttributes added in v0.3.0

func (m ImmutablePipeMessage) GetMessageAttributes() map[string]string

GetMessageAttributes returns a copy of the payload attributes.

func (ImmutablePipeMessage) GetMsgRefId added in v0.3.0

func (m ImmutablePipeMessage) GetMsgRefId() string

func (ImmutablePipeMessage) GetPayload added in v0.3.0

func (m ImmutablePipeMessage) GetPayload() []byte

GetPayload returns a copy of the message payload.

type Message

type Message interface {
	GetMsgRefId() string
	GetPayload() []byte
	GetMessageAttributes() map[string]string
}

A Message is a wrapper for a payload (consisting of a body and attributes). An instance of Message can be routed through a pipeline.

type NamedStage

type NamedStage struct {
	Name string
	Stage
}

func (NamedStage) Process

func (s NamedStage) Process(ctx context.Context, msg Message) (Message, error)

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

Orchestrator orchestrates multiple pipelines, each with its own input and output channels and a configurable number of workers.

func NewOrchestrator

func NewOrchestrator(pipelines map[string]Config) *Orchestrator

NewOrchestrator creates a new Orchestrator.

Parameters: - pipelines: A map where each key is a pipeline identifier and the value is the Config.

Returns: - A new instance of Orchestrator.

func (*Orchestrator) Execute

func (o *Orchestrator) Execute(cancelCtx context.Context, wg *sync.WaitGroup)

Execute runs all pipelines, each with its configured number of workers.

This method starts multiple goroutines for each pipeline based on its configuration. Each goroutine listens for messages on its input channel, processes them through the pipeline, and sends the results to the output channel.

Parameters: - cancelCtx: A context that can be used to cancel the processing. - wg: A wait group to wait for all processing goroutines to complete.

type PipeMessage added in v0.3.0

type PipeMessage struct {
	// contains filtered or unexported fields
}

func NewPipeMessage added in v0.3.0

func NewPipeMessage(messageId string, payloadBody []byte, msgAttrs map[string]string) PipeMessage

NewPipeMessage creates a new PipelineMessage.

func (PipeMessage) AddMessageAttribute added in v0.3.0

func (m PipeMessage) AddMessageAttribute(key string, value string) (PipeMessage, error)

AddMessageAttribute adds an internal message attribute to a copy of the original message to keep the original instance unchanged. It will throw an error if the attribute key already exists.

func (PipeMessage) GetMessageAttributes added in v0.3.0

func (m PipeMessage) GetMessageAttributes() map[string]string

GetMessageAttributes returns a copy of the payload attributes.

func (PipeMessage) GetMsgRefId added in v0.3.0

func (m PipeMessage) GetMsgRefId() string

func (PipeMessage) GetPayload added in v0.3.0

func (m PipeMessage) GetPayload() []byte

GetPayload returns the message payload.

type Pipeline

type Pipeline struct {
	Name string
	// contains filtered or unexported fields
}

A Pipeline helps orchestrate multiple pipeline steps.

func NewPipeline

func NewPipeline(name string, stages []Stage) *Pipeline

NewPipeline creates a new pipeline including the stages as configured.

func (Pipeline) Process

func (p Pipeline) Process(ctx context.Context, msg Message) (Message, error)

Process pipes an incoming message through the pipeline. Parameters:

  • ctx (context.Context): Processing context. Used for tracing.
  • msg (Message): ImmutablePipeMessage to process.

Returns:

  • Message: Processed message. Since incoming messages are immutable, this is an updated copy.
  • error: If any error occurs during processing, this will not be nil.

type Stage

type Stage interface {
	// Process processes an incoming message and returns a modified copy and/or error once done.
	Process(context.Context, Message) (Message, error)
}

Stage describes a pipeline stage a message can be routed through. Depending on the implementation, it may return the same message or a modified copy.

type Status added in v0.3.0

type Status int

Status Define the Pipeline Status enum

const (
	Starting Status = iota
	Processing
	Completed
	Error
	Stopped
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL