Documentation ¶
Overview ¶
Package stage provides a framework for creating stages
Index ¶
- Constants
- type Func
- type IStage
- type OnFinished
- type Stage
- func (s *Stage[In, Out]) GetCounterCreated() *expvar.Int
- func (s *Stage[In, Out]) GetCounterDone() *expvar.Int
- func (s *Stage[In, Out]) GetCounterFailed() *expvar.Int
- func (s *Stage[In, Out]) GetCounterRunning() *expvar.Int
- func (s *Stage[In, Out]) GetDescription() string
- func (s *Stage[In, Out]) GetLogger() sypl.ISypl
- func (s *Stage[In, Out]) GetName() string
- func (s *Stage[In, Out]) GetOnFinished() OnFinished[In, Out]
- func (s *Stage[In, Out]) GetStatus() *expvar.String
- func (s *Stage[In, Out]) GetType() string
- func (s *Stage[In, Out]) Run(ctx context.Context, in []In) ([]Out, error)
- func (s *Stage[In, Out]) SetOnFinished(onFinished OnFinished[In, Out])
Constants ¶
const Type = "stage"
Type of the entity.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Func ¶
Func allows to specify message's options.
func WithOnFinished ¶
func WithOnFinished[In, Out any](onFinished OnFinished[In, Out]) Func[In, Out]
WithOnFinished sets the OnFinished function.
type IStage ¶
type IStage[In, Out any] interface { shared.IMeta shared.IMetrics // GetOnFinished returns the `OnFinished` function. GetOnFinished() OnFinished[In, Out] // SetOnFinished sets the `OnFinished` function. SetOnFinished(onFinished OnFinished[In, Out]) // Run the stage function. Run(ctx context.Context, in []In) (out []Out, err error) }
IStage defines what a `Stage` must do.
func New ¶
func New[In, Out any]( name string, conversor concurrentloop.MapFunc[In, Out], processors ...processor.IProcessor[In], ) (IStage[In, Out], error)
New returns a new stage.
type OnFinished ¶
type OnFinished[In, Out any] func(ctx context.Context, p IStage[In, Out], in []In, processedOut []Out)
OnFinished is the function that is called when a processor finishes its execution.
type Stage ¶
type Stage[In, Out any] struct { // Description of the processor. Description string `json:"description"` // Conversor to be used in the stage. Conversor concurrentloop.MapFunc[In, Out] `json:"-" validate:"required"` // Logger is the pipeline logger. Logger sypl.ISypl `json:"-" validate:"required"` // Name of the stage. Name string `json:"name" validate:"required"` // OnFinished is the function that is called when a processor finishes its // execution. OnFinished OnFinished[In, Out] `json:"-"` // Processors to be run in the stage. Processors []processor.IProcessor[In] `json:"processors" validate:"required,gt=0"` // Progress of the stage. Progress int `json:"progress"` // Metrics. CounterCreated *expvar.Int `json:"counterCreated" validate:"required,gte=0"` CounterRunning *expvar.Int `json:"counterRunning" validate:"required,gte=0"` CounterFailed *expvar.Int `json:"counterFailed" validate:"required,gte=0"` CounterDone *expvar.Int `json:"counterDone" validate:"required,gte=0"` Status *expvar.String `json:"status" validate:"required,gte=0"` }
Stage definition.
func (*Stage[In, Out]) GetCounterCreated ¶
GetCounterCreated returns the `CounterCreated` of the processor.
func (*Stage[In, Out]) GetCounterDone ¶
GetCounterDone returns the `CounterDone` of the processor.
func (*Stage[In, Out]) GetCounterFailed ¶
GetCounterFailed returns the `CounterFailed` of the processor.
func (*Stage[In, Out]) GetCounterRunning ¶
GetCounterRunning returns the `CounterRunning` of the processor.
func (*Stage[In, Out]) GetDescription ¶
GetDescription returns the `Description` of the processor.
func (*Stage[In, Out]) GetOnFinished ¶
func (s *Stage[In, Out]) GetOnFinished() OnFinished[In, Out]
GetOnFinished returns the `OnFinished` function.
func (*Stage[In, Out]) SetOnFinished ¶
func (s *Stage[In, Out]) SetOnFinished(onFinished OnFinished[In, Out])
SetOnFinished sets the `OnFinished` function.