Documentation ¶
Overview ¶
Package stage provides a framework for creating stages
Index ¶
- Constants
- type Func
- type IStage
- type OnFinished
- type Stage
- func (s *Stage[ProcessingData, ConvertedData]) GetCounterCreated() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetCounterDone() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetCounterFailed() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetCounterRunning() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetCreatedAt() time.Time
- func (s *Stage[ProcessingData, ConvertedData]) GetDescription() string
- func (s *Stage[ProcessingData, ConvertedData]) GetDuration() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetLogger() sypl.ISypl
- func (s *Stage[ProcessingData, ConvertedData]) GetMetrics() map[string]string
- func (s *Stage[ProcessingData, ConvertedData]) GetName() string
- func (s *Stage[ProcessingData, ConvertedData]) GetOnFinished() OnFinished[ProcessingData, ConvertedData]
- func (s *Stage[ProcessingData, ConvertedData]) GetProgress() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetProgressPercent() *expvar.String
- func (s *Stage[ProcessingData, ConvertedData]) GetStatus() *expvar.String
- func (s *Stage[ProcessingData, ConvertedData]) GetType() string
- func (s *Stage[ProcessingData, ConvertedData]) Run(ctx context.Context, tsk task.Task[ProcessingData, ConvertedData]) (task.Task[ProcessingData, ConvertedData], error)
- func (s *Stage[ProcessingData, ConvertedData]) SetOnFinished(onFinished OnFinished[ProcessingData, ConvertedData])
- func (s *Stage[ProcessingData, ConvertedData]) SetProgressPercent()
Constants ¶
const Type = "stage"
Type of the entity.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Func ¶
type Func[ProcessedData, ConvertedOut any] func(p IStage[ProcessedData, ConvertedOut]) IStage[ProcessedData, ConvertedOut]
Func allows to specify message's options.
func WithOnFinished ¶
func WithOnFinished[ProcessedData, ConvertedOut any](onFinished OnFinished[ProcessedData, ConvertedOut]) Func[ProcessedData, ConvertedOut]
WithOnFinished sets the OnFinished function.
type IStage ¶
type IStage[ProcessedData, ConvertedOut any] interface { shared.IMeta shared.IMetrics // GetProgress returns the `CounterProgress` of the stage. GetProgress() *expvar.Int // GetProgressPercent returns the `ProgressPercent` of the stage. GetProgressPercent() *expvar.String // SetProgressPercent sets the `ProgressPercent` of the stage. SetProgressPercent() // GetOnFinished returns the `OnFinished` function. GetOnFinished() OnFinished[ProcessedData, ConvertedOut] // SetOnFinished sets the `OnFinished` function. SetOnFinished(onFinished OnFinished[ProcessedData, ConvertedOut]) // Run the stage function. Run(context.Context, task.Task[ProcessedData, ConvertedOut]) (task.Task[ProcessedData, ConvertedOut], error) }
IStage defines what a `Stage` must do.
type OnFinished ¶
type OnFinished[ProcessedData, ConvertedOut any] func(ctx context.Context, s IStage[ProcessedData, ConvertedOut], tskIn task.Task[ProcessedData, ConvertedOut], tskOut task.Task[ProcessedData, ConvertedOut])
OnFinished is the function that is called when a processor finishes its execution.
type Stage ¶
type Stage[ProcessingData, ConvertedData any] struct { // Description of the stage. Description string `json:"description"` // Conversor to be used tsk the stage. Conversor concurrentloop.MapFunc[ProcessingData, ConvertedData] `json:"-" validate:"required"` // Logger is the internal logger. Logger sypl.ISypl `json:"-" validate:"required"` // Name of the stage. Name string `json:"name" validate:"required"` // OnFinished is the function that is called when a processor finishes its // execution. OnFinished OnFinished[ProcessingData, ConvertedData] `json:"-"` // Processors to be run tsk the stage. Processors []processor.IProcessor[ProcessingData] `json:"processors" validate:"required,gt=0"` // Metrics. CounterCreated *expvar.Int `json:"counterCreated"` CounterDone *expvar.Int `json:"counterDone"` CounterFailed *expvar.Int `json:"counterFailed"` CounterRunning *expvar.Int `json:"counterRunning"` CreatedAt time.Time `json:"createdAt"` Duration *expvar.Int `json:"duration"` Progress *expvar.Int `json:"progress"` ProgressPercent *expvar.String `json:"progressPercent"` Status *expvar.String `json:"status"` }
Stage definition.
func (*Stage[ProcessingData, ConvertedData]) GetCounterCreated ¶
GetCounterCreated returns the `CounterCreated` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetCounterDone ¶
GetCounterDone returns the `CounterDone` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetCounterFailed ¶
GetCounterFailed returns the `CounterFailed` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetCounterRunning ¶
GetCounterRunning returns the `CounterRunning` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetCreatedAt ¶ added in v2.0.6
GetCreatedAt returns the created at time.
func (*Stage[ProcessingData, ConvertedData]) GetDescription ¶
GetDescription returns the `Description` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetDuration ¶ added in v2.0.6
GetDuration returns the `CounterDuration` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetLogger ¶
GetLogger returns the `Logger` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetMetrics ¶ added in v2.0.7
GetMetrics returns the stage's metrics.
func (*Stage[ProcessingData, ConvertedData]) GetOnFinished ¶
func (s *Stage[ProcessingData, ConvertedData]) GetOnFinished() OnFinished[ProcessingData, ConvertedData]
GetOnFinished returns the `OnFinished` function.
func (*Stage[ProcessingData, ConvertedData]) GetProgress ¶ added in v2.0.6
GetProgress returns the `CounterProgress` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetProgressPercent ¶ added in v2.0.6
GetProgressPercent returns the `ProgressPercent` of the stage.
func (*Stage[ProcessingData, ConvertedData]) Run ¶
func (s *Stage[ProcessingData, ConvertedData]) Run(ctx context.Context, tsk task.Task[ProcessingData, ConvertedData]) (task.Task[ProcessingData, ConvertedData], error)
Run the transform function.
func (*Stage[ProcessingData, ConvertedData]) SetOnFinished ¶
func (s *Stage[ProcessingData, ConvertedData]) SetOnFinished(onFinished OnFinished[ProcessingData, ConvertedData])
SetOnFinished sets the `OnFinished` function.
func (*Stage[ProcessingData, ConvertedData]) SetProgressPercent ¶ added in v2.0.6
func (s *Stage[ProcessingData, ConvertedData]) SetProgressPercent()
SetProgressPercent sets the `ProgressPercent` of the stage.