Documentation
¶
Overview ¶
Package stage provides a framework for creating stages
Index ¶
- Constants
- type Func
- type IStage
- type OnFinished
- type Stage
- func (s *Stage[ProcessingData, ConvertedData]) GetCounterCreated() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetCounterDone() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetCounterFailed() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetCounterRunning() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetCreatedAt() time.Time
- func (s *Stage[ProcessingData, ConvertedData]) GetDescription() string
- func (s *Stage[ProcessingData, ConvertedData]) GetDuration() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetLogger() sypl.ISypl
- func (s *Stage[ProcessingData, ConvertedData]) GetMetrics() map[string]string
- func (s *Stage[ProcessingData, ConvertedData]) GetName() string
- func (s *Stage[ProcessingData, ConvertedData]) GetOnFinished() OnFinished[ProcessingData, ConvertedData]
- func (s *Stage[ProcessingData, ConvertedData]) GetProgress() *expvar.Int
- func (s *Stage[ProcessingData, ConvertedData]) GetProgressPercent() *expvar.String
- func (s *Stage[ProcessingData, ConvertedData]) GetStatus() *expvar.String
- func (s *Stage[ProcessingData, ConvertedData]) GetType() string
- func (s *Stage[ProcessingData, ConvertedData]) Run(ctx context.Context, tsk task.Task[ProcessingData, ConvertedData]) (task.Task[ProcessingData, ConvertedData], error)
- func (s *Stage[ProcessingData, ConvertedData]) SetOnFinished(onFinished OnFinished[ProcessingData, ConvertedData])
- func (s *Stage[ProcessingData, ConvertedData]) SetProgressPercent()
Examples ¶
Constants ¶
const Type = "stage"
Type of the entity.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Func ¶
type Func[ProcessedData, ConvertedOut any] func(p IStage[ProcessedData, ConvertedOut]) IStage[ProcessedData, ConvertedOut]
Func allows to specify message's options.
func WithOnFinished ¶
func WithOnFinished[ProcessedData, ConvertedOut any](onFinished OnFinished[ProcessedData, ConvertedOut]) Func[ProcessedData, ConvertedOut]
WithOnFinished sets the OnFinished function.
type IStage ¶
type IStage[ProcessedData, ConvertedOut any] interface { shared.IMeta shared.IMetrics // GetProgress returns the `CounterProgress` of the stage. GetProgress() *expvar.Int // GetProgressPercent returns the `ProgressPercent` of the stage. GetProgressPercent() *expvar.String // SetProgressPercent sets the `ProgressPercent` of the stage. SetProgressPercent() // GetOnFinished returns the `OnFinished` function. GetOnFinished() OnFinished[ProcessedData, ConvertedOut] // SetOnFinished sets the `OnFinished` function. SetOnFinished(onFinished OnFinished[ProcessedData, ConvertedOut]) // Run the stage function. Run(context.Context, task.Task[ProcessedData, ConvertedOut]) (task.Task[ProcessedData, ConvertedOut], error) }
IStage defines what a `Stage` must do.
func New ¶
func New[ProcessingData, ConvertedData any]( name string, description string, conversor converter.IConverter[ProcessingData, ConvertedData], processors ...processor.IProcessor[ProcessingData], ) (IStage[ProcessingData, ConvertedData], error)
New returns a new stage.
Example (Storage_processor) ¶
Demonstrates the usage of a stage with the storage processor (memory storage).
// Sample data. tests := []Test{ { ID: "1", Name: "John", }, { ID: "2", Name: "Peter", }, } // Memory storage from DAL. memoryStorage, err := memory.New(context.Background()) if err != nil { log.Fatalln(err) } // Storage processor, concurrency set to 1. s, err := storage.New[Test](memoryStorage, 1, "example-") if err != nil { log.Fatalln(err) } // Stage with the storage processor. stg1, err := New( "stage-1", "main stage", // Add as many as you want. passthru.Must[Test](), // Pass-through, does nothing. // Add as many as you want. s, ) if err != nil { log.Fatalln(err) } // Run the stage passing the processing data as a task. tasksOut, err := stg1.Run(context.Background(), task.Task[Test, Test]{ ProcessingData: tests, }) if err != nil { log.Fatalln(err) } // String builder to contain the output of the stage and the memory storage. var buf strings.Builder // Iterate over tasksOut and write to the buffer. for _, v := range tasksOut.ConvertedData { buf.WriteString(fmt.Sprintf("%s %s\n", v.ID, v.Name)) } // Get a list from the memory storage. // // NOTE: The usage of memory.ResponseList[Test] wrapper. var fromMemory memory.ResponseList[Test] if err := memoryStorage.List(context.Background(), "etl", &fromMemory, &list.List{}); err != nil { log.Fatalln(err) } // Iterate over fromMemory so we can add to the buffer. for _, v := range fromMemory.Items { buf.WriteString(fmt.Sprintf("%s %s\n", v.ID, v.Name)) } // Get the content of the buffer. bufferContent := buf.String() ////// // Check in the buffer if the name John and Peter appears 2 times. ////// if strings.Count(bufferContent, "John") == 2 { fmt.Println("John appears 2 times") } if strings.Count(bufferContent, "Peter") == 2 { fmt.Println("Peter appears 2 times") }
Output: John appears 2 times Peter appears 2 times
type OnFinished ¶
type OnFinished[ProcessedData, ConvertedOut any] func(ctx context.Context, s IStage[ProcessedData, ConvertedOut], tskIn task.Task[ProcessedData, ConvertedOut], tskOut task.Task[ProcessedData, ConvertedOut])
OnFinished is the function that is called when a processor finishes its execution.
type Stage ¶
type Stage[ProcessingData, ConvertedData any] struct { // Description of the stage. Description string `json:"description"` // Conversor to be used tsk the stage. Conversor converter.IConverter[ProcessingData, ConvertedData] `json:"-" validate:"required"` // Logger is the internal logger. Logger sypl.ISypl `json:"-" validate:"required"` // Name of the stage. Name string `json:"name" validate:"required"` // OnFinished is the function that is called when a processor finishes its // execution. OnFinished OnFinished[ProcessingData, ConvertedData] `json:"-"` // Processors to be run tsk the stage. Processors []processor.IProcessor[ProcessingData] `json:"processors" validate:"required,gt=0"` // Metrics. CounterCreated *expvar.Int `json:"counterCreated"` CounterDone *expvar.Int `json:"counterDone"` CounterFailed *expvar.Int `json:"counterFailed"` CounterRunning *expvar.Int `json:"counterRunning"` CreatedAt time.Time `json:"createdAt"` Duration *expvar.Int `json:"duration"` Progress *expvar.Int `json:"progress"` ProgressPercent *expvar.String `json:"progressPercent"` Status *expvar.String `json:"status"` }
Stage definition.
func (*Stage[ProcessingData, ConvertedData]) GetCounterCreated ¶
GetCounterCreated returns the `CounterCreated` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetCounterDone ¶
GetCounterDone returns the `CounterDone` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetCounterFailed ¶
GetCounterFailed returns the `CounterFailed` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetCounterRunning ¶
GetCounterRunning returns the `CounterRunning` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetCreatedAt ¶ added in v2.0.6
GetCreatedAt returns the created at time.
func (*Stage[ProcessingData, ConvertedData]) GetDescription ¶
GetDescription returns the `Description` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetDuration ¶ added in v2.0.6
GetDuration returns the `CounterDuration` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetLogger ¶
GetLogger returns the `Logger` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetMetrics ¶ added in v2.0.7
GetMetrics returns the stage's metrics.
func (*Stage[ProcessingData, ConvertedData]) GetOnFinished ¶
func (s *Stage[ProcessingData, ConvertedData]) GetOnFinished() OnFinished[ProcessingData, ConvertedData]
GetOnFinished returns the `OnFinished` function.
func (*Stage[ProcessingData, ConvertedData]) GetProgress ¶ added in v2.0.6
GetProgress returns the `CounterProgress` of the stage.
func (*Stage[ProcessingData, ConvertedData]) GetProgressPercent ¶ added in v2.0.6
GetProgressPercent returns the `ProgressPercent` of the stage.
func (*Stage[ProcessingData, ConvertedData]) Run ¶
func (s *Stage[ProcessingData, ConvertedData]) Run(ctx context.Context, tsk task.Task[ProcessingData, ConvertedData]) (task.Task[ProcessingData, ConvertedData], error)
Run the transform function.
func (*Stage[ProcessingData, ConvertedData]) SetOnFinished ¶
func (s *Stage[ProcessingData, ConvertedData]) SetOnFinished(onFinished OnFinished[ProcessingData, ConvertedData])
SetOnFinished sets the `OnFinished` function.
func (*Stage[ProcessingData, ConvertedData]) SetProgressPercent ¶ added in v2.0.6
func (s *Stage[ProcessingData, ConvertedData]) SetProgressPercent()
SetProgressPercent sets the `ProgressPercent` of the stage.