Documentation ¶
Index ¶
- func BuildPipelineLog(status Status, workerID string, pipelineName string, stageName string, ...) string
- type Config
- type ImmutablePipeEvent
- func (m ImmutablePipeEvent) AddMessageAttribute(key string, value string) (ImmutablePipeEvent, error)
- func (m ImmutablePipeEvent) Copy() ImmutablePipeEvent
- func (m ImmutablePipeEvent) GetEventId() string
- func (m ImmutablePipeEvent) GetMessageAttributes() map[string]string
- func (m ImmutablePipeEvent) GetPayload() []byte
- type NamedStage
- type Orchestrator
- type PipeEvent
- type Pipeline
- type Stage
- type Status
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct { Pipeline *Pipeline InputChan chan PipeEvent OutputChan chan PipeEvent NumWorkers int }
Config holds the configuration for each pipeline, including its input and output channels, and the number of workers.
type ImmutablePipeEvent ¶ added in v1.0.0
type ImmutablePipeEvent struct {
// contains filtered or unexported fields
}
A ImmutablePipeEvent is a wrapper for a payload (consisting of a body and attributes) and internal message attributes. An instance of PipelineMessage can be routed through a pipeline.
func NewImmutablePipeMessage ¶ added in v0.3.0
func NewImmutablePipeMessage(eventId string, payloadBody []byte, msgAttrs map[string]string) ImmutablePipeEvent
NewImmutablePipeMessage creates a new ImmutablePipeEvent.
func (ImmutablePipeEvent) AddMessageAttribute ¶ added in v1.0.0
func (m ImmutablePipeEvent) AddMessageAttribute(key string, value string) (ImmutablePipeEvent, error)
AddMessageAttribute adds an internal message attribute to a copy of the original message to keep the original instance unchanged. It will throw an error if the attribute key already exists.
func (ImmutablePipeEvent) Copy ¶ added in v1.0.0
func (m ImmutablePipeEvent) Copy() ImmutablePipeEvent
Copy copies a message and its contents.
func (ImmutablePipeEvent) GetEventId ¶ added in v1.0.0
func (m ImmutablePipeEvent) GetEventId() string
func (ImmutablePipeEvent) GetMessageAttributes ¶ added in v1.0.0
func (m ImmutablePipeEvent) GetMessageAttributes() map[string]string
GetMessageAttributes returns a copy of the payload attributes.
func (ImmutablePipeEvent) GetPayload ¶ added in v1.0.0
func (m ImmutablePipeEvent) GetPayload() []byte
GetPayload returns a copy of the message payload.
type NamedStage ¶
type Orchestrator ¶
type Orchestrator struct {
// contains filtered or unexported fields
}
Orchestrator orchestrates multiple pipelines, each with its own input and output channels and a configurable number of workers.
func NewOrchestrator ¶
func NewOrchestrator(pipelines map[string]Config) *Orchestrator
NewOrchestrator creates a new Orchestrator.
Parameters: - pipelines: A map where each key is a pipeline identifier and the value is the Config.
Returns: - A new instance of Orchestrator.
func (*Orchestrator) Execute ¶
func (o *Orchestrator) Execute(cancelCtx context.Context, wg *sync.WaitGroup)
Execute runs all pipelines, each with its configured number of workers.
This method starts multiple goroutines for each pipeline based on its configuration. Each goroutine listens for messages on its input channel, processes them through the pipeline, and sends the results to the output channel.
Parameters: - cancelCtx: A context that can be used to cancel the processing. - wg: A wait group to wait for all processing goroutines to complete.
type PipeEvent ¶ added in v1.0.0
type PipeEvent interface {
GetEventId() string
}
PipeEvent - pipeline message payload interface
type Pipeline ¶
type Pipeline struct { Name string // contains filtered or unexported fields }
A Pipeline helps orchestrate multiple pipeline steps.
func NewPipeline ¶
NewPipeline creates a new pipeline including the stages as configured.
func (Pipeline) Process ¶
Process pipes an incoming message through the pipeline. Parameters:
- ctx (context.Context): Processing context. Used for tracing.
- msg (PipeEvent): ImmutablePipeEvent to process.
Returns:
- PipeEvent: Processed message. Since incoming messages are immutable, this is an updated copy.
- error: If any error occurs during processing, this will not be nil.
type Stage ¶
type Stage interface { // Process processes an incoming message and returns a modified copy and/or error once done. Process(context.Context, PipeEvent) (PipeEvent, error) }
Stage describes a pipeline stage a message can be routed through. Depending on the implementation, it may return the same message or a modified copy.