stage

package
v2.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package stage provides a framework for creating stages

Index

Constants

View Source
const Type = "stage"

Type of the entity.

Variables

This section is empty.

Functions

This section is empty.

Types

type Func

type Func[ProcessedData, ConvertedOut any] func(p IStage[ProcessedData, ConvertedOut]) IStage[ProcessedData, ConvertedOut]

Func allows to specify message's options.

func WithOnFinished

func WithOnFinished[ProcessedData, ConvertedOut any](onFinished OnFinished[ProcessedData, ConvertedOut]) Func[ProcessedData, ConvertedOut]

WithOnFinished sets the OnFinished function.

type IStage

type IStage[ProcessedData, ConvertedOut any] interface {
	shared.IMeta

	shared.IMetrics

	// GetProgress returns the `CounterProgress` of the stage.
	GetProgress() *expvar.Int

	// GetProgressPercent returns the `ProgressPercent` of the stage.
	GetProgressPercent() *expvar.String

	// SetProgressPercent sets the `ProgressPercent` of the stage.
	SetProgressPercent()

	// GetOnFinished returns the `OnFinished` function.
	GetOnFinished() OnFinished[ProcessedData, ConvertedOut]

	// SetOnFinished sets the `OnFinished` function.
	SetOnFinished(onFinished OnFinished[ProcessedData, ConvertedOut])

	// Run the stage function.
	Run(context.Context, task.Task[ProcessedData, ConvertedOut]) (task.Task[ProcessedData, ConvertedOut], error)
}

IStage defines what a `Stage` must do.

func New

func New[ProcessingData, ConvertedData any](
	name string,
	description string,
	conversor concurrentloop.MapFunc[ProcessingData, ConvertedData],
	processors ...processor.IProcessor[ProcessingData],
) (IStage[ProcessingData, ConvertedData], error)

New returns a new stage.

type OnFinished

type OnFinished[ProcessedData, ConvertedOut any] func(ctx context.Context, s IStage[ProcessedData, ConvertedOut], tskIn task.Task[ProcessedData, ConvertedOut], tskOut task.Task[ProcessedData, ConvertedOut])

OnFinished is the function that is called when a processor finishes its execution.

type Stage

type Stage[ProcessingData, ConvertedData any] struct {
	// Description of the stage.
	Description string `json:"description"`

	// Conversor to be used tsk the stage.
	Conversor concurrentloop.MapFunc[ProcessingData, ConvertedData] `json:"-" validate:"required"`

	// Logger is the internal logger.
	Logger sypl.ISypl `json:"-" validate:"required"`

	// Name of the stage.
	Name string `json:"name" validate:"required"`

	// OnFinished is the function that is called when a processor finishes its
	// execution.
	OnFinished OnFinished[ProcessingData, ConvertedData] `json:"-"`

	// Processors to be run tsk the stage.
	Processors []processor.IProcessor[ProcessingData] `json:"processors" validate:"required,gt=0"`

	// Metrics.
	CounterCreated *expvar.Int `json:"counterCreated"`
	CounterDone    *expvar.Int `json:"counterDone"`
	CounterFailed  *expvar.Int `json:"counterFailed"`
	CounterRunning *expvar.Int `json:"counterRunning"`

	CreatedAt       time.Time      `json:"createdAt"`
	Duration        *expvar.Int    `json:"duration"`
	Progress        *expvar.Int    `json:"progress"`
	ProgressPercent *expvar.String `json:"progressPercent"`
	Status          *expvar.String `json:"status"`
}

Stage definition.

func (*Stage[ProcessingData, ConvertedData]) GetCounterCreated

func (s *Stage[ProcessingData, ConvertedData]) GetCounterCreated() *expvar.Int

GetCounterCreated returns the `CounterCreated` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetCounterDone

func (s *Stage[ProcessingData, ConvertedData]) GetCounterDone() *expvar.Int

GetCounterDone returns the `CounterDone` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetCounterFailed

func (s *Stage[ProcessingData, ConvertedData]) GetCounterFailed() *expvar.Int

GetCounterFailed returns the `CounterFailed` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetCounterRunning

func (s *Stage[ProcessingData, ConvertedData]) GetCounterRunning() *expvar.Int

GetCounterRunning returns the `CounterRunning` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetCreatedAt added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) GetCreatedAt() time.Time

GetCreatedAt returns the created at time.

func (*Stage[ProcessingData, ConvertedData]) GetDescription

func (s *Stage[ProcessingData, ConvertedData]) GetDescription() string

GetDescription returns the `Description` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetDuration added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) GetDuration() *expvar.Int

GetDuration returns the `CounterDuration` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetLogger

func (s *Stage[ProcessingData, ConvertedData]) GetLogger() sypl.ISypl

GetLogger returns the `Logger` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetMetrics added in v2.0.7

func (s *Stage[ProcessingData, ConvertedData]) GetMetrics() map[string]string

GetMetrics returns the stage's metrics.

func (*Stage[ProcessingData, ConvertedData]) GetName

func (s *Stage[ProcessingData, ConvertedData]) GetName() string

GetName returns the `Name` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetOnFinished

func (s *Stage[ProcessingData, ConvertedData]) GetOnFinished() OnFinished[ProcessingData, ConvertedData]

GetOnFinished returns the `OnFinished` function.

func (*Stage[ProcessingData, ConvertedData]) GetProgress added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) GetProgress() *expvar.Int

GetProgress returns the `CounterProgress` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetProgressPercent added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) GetProgressPercent() *expvar.String

GetProgressPercent returns the `ProgressPercent` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetStatus

func (s *Stage[ProcessingData, ConvertedData]) GetStatus() *expvar.String

GetStatus returns the `Status` metric.

func (*Stage[ProcessingData, ConvertedData]) GetType

func (s *Stage[ProcessingData, ConvertedData]) GetType() string

GetType returns the entity type.

func (*Stage[ProcessingData, ConvertedData]) Run

func (s *Stage[ProcessingData, ConvertedData]) Run(ctx context.Context, tsk task.Task[ProcessingData, ConvertedData]) (task.Task[ProcessingData, ConvertedData], error)

Run the transform function.

func (*Stage[ProcessingData, ConvertedData]) SetOnFinished

func (s *Stage[ProcessingData, ConvertedData]) SetOnFinished(onFinished OnFinished[ProcessingData, ConvertedData])

SetOnFinished sets the `OnFinished` function.

func (*Stage[ProcessingData, ConvertedData]) SetProgressPercent added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) SetProgressPercent()

SetProgressPercent sets the `ProgressPercent` of the stage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL