stage

package
v2.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package stage provides a framework for creating stages

Index

Examples

Constants

View Source
const Type = "stage"

Type of the entity.

Variables

This section is empty.

Functions

This section is empty.

Types

type Func

type Func[ProcessedData, ConvertedOut any] func(p IStage[ProcessedData, ConvertedOut]) IStage[ProcessedData, ConvertedOut]

Func allows to specify message's options.

func WithOnFinished

func WithOnFinished[ProcessedData, ConvertedOut any](onFinished OnFinished[ProcessedData, ConvertedOut]) Func[ProcessedData, ConvertedOut]

WithOnFinished sets the OnFinished function.

type IStage

type IStage[ProcessedData, ConvertedOut any] interface {
	shared.IMeta

	shared.IMetrics

	// GetProgress returns the `CounterProgress` of the stage.
	GetProgress() *expvar.Int

	// GetProgressPercent returns the `ProgressPercent` of the stage.
	GetProgressPercent() *expvar.String

	// SetProgressPercent sets the `ProgressPercent` of the stage.
	SetProgressPercent()

	// GetOnFinished returns the `OnFinished` function.
	GetOnFinished() OnFinished[ProcessedData, ConvertedOut]

	// SetOnFinished sets the `OnFinished` function.
	SetOnFinished(onFinished OnFinished[ProcessedData, ConvertedOut])

	// Run the stage function.
	Run(context.Context, task.Task[ProcessedData, ConvertedOut]) (task.Task[ProcessedData, ConvertedOut], error)
}

IStage defines what a `Stage` must do.

func New

func New[ProcessingData, ConvertedData any](
	name string,
	description string,
	conversor converter.IConverter[ProcessingData, ConvertedData],
	processors ...processor.IProcessor[ProcessingData],
) (IStage[ProcessingData, ConvertedData], error)

New returns a new stage.

Example (Storage_processor)

Demonstrates the usage of a stage with the storage processor (memory storage).

// Sample data.
tests := []Test{
	{
		ID:   "1",
		Name: "John",
	},
	{
		ID:   "2",
		Name: "Peter",
	},
}

// Memory storage from DAL.
memoryStorage, err := memory.New(context.Background())
if err != nil {
	log.Fatalln(err)
}

// Storage processor, concurrency set to 1.
s, err := storage.New[Test](memoryStorage, 1, "example-")
if err != nil {
	log.Fatalln(err)
}

// Stage with the storage processor.
stg1, err := New(
	"stage-1",
	"main stage",

	// Add as many as you want.
	passthru.Must[Test](), // Pass-through, does nothing.

	// Add as many as you want.
	s,
)
if err != nil {
	log.Fatalln(err)
}

// Run the stage passing the processing data as a task.
tasksOut, err := stg1.Run(context.Background(), task.Task[Test, Test]{
	ProcessingData: tests,
})
if err != nil {
	log.Fatalln(err)
}

// String builder to contain the output of the stage and the memory storage.
var buf strings.Builder

// Iterate over tasksOut and write to the buffer.
for _, v := range tasksOut.ConvertedData {
	buf.WriteString(fmt.Sprintf("%s %s\n", v.ID, v.Name))
}

// Get a list from the memory storage.
//
// NOTE: The usage of memory.ResponseList[Test] wrapper.
var fromMemory memory.ResponseList[Test]
if err := memoryStorage.List(context.Background(), "etl", &fromMemory, &list.List{}); err != nil {
	log.Fatalln(err)
}

// Iterate over fromMemory so we can add to the buffer.
for _, v := range fromMemory.Items {
	buf.WriteString(fmt.Sprintf("%s %s\n", v.ID, v.Name))
}

// Get the content of the buffer.
bufferContent := buf.String()

//////
// Check in the buffer if the name John and Peter appears 2 times.
//////

if strings.Count(bufferContent, "John") == 2 {
	fmt.Println("John appears 2 times")
}

if strings.Count(bufferContent, "Peter") == 2 {
	fmt.Println("Peter appears 2 times")
}
Output:

John appears 2 times
Peter appears 2 times

type OnFinished

type OnFinished[ProcessedData, ConvertedOut any] func(ctx context.Context, s IStage[ProcessedData, ConvertedOut], tskIn task.Task[ProcessedData, ConvertedOut], tskOut task.Task[ProcessedData, ConvertedOut])

OnFinished is the function that is called when a processor finishes its execution.

type Stage

type Stage[ProcessingData, ConvertedData any] struct {
	// Description of the stage.
	Description string `json:"description"`

	// Conversor to be used tsk the stage.
	Conversor converter.IConverter[ProcessingData, ConvertedData] `json:"-" validate:"required"`

	// Logger is the internal logger.
	Logger sypl.ISypl `json:"-" validate:"required"`

	// Name of the stage.
	Name string `json:"name" validate:"required"`

	// OnFinished is the function that is called when a processor finishes its
	// execution.
	OnFinished OnFinished[ProcessingData, ConvertedData] `json:"-"`

	// Processors to be run tsk the stage.
	Processors []processor.IProcessor[ProcessingData] `json:"processors" validate:"required,gt=0"`

	// Metrics.
	CounterCreated *expvar.Int `json:"counterCreated"`
	CounterDone    *expvar.Int `json:"counterDone"`
	CounterFailed  *expvar.Int `json:"counterFailed"`
	CounterRunning *expvar.Int `json:"counterRunning"`

	CreatedAt       time.Time      `json:"createdAt"`
	Duration        *expvar.Int    `json:"duration"`
	Progress        *expvar.Int    `json:"progress"`
	ProgressPercent *expvar.String `json:"progressPercent"`
	Status          *expvar.String `json:"status"`
}

Stage definition.

func (*Stage[ProcessingData, ConvertedData]) GetCounterCreated

func (s *Stage[ProcessingData, ConvertedData]) GetCounterCreated() *expvar.Int

GetCounterCreated returns the `CounterCreated` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetCounterDone

func (s *Stage[ProcessingData, ConvertedData]) GetCounterDone() *expvar.Int

GetCounterDone returns the `CounterDone` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetCounterFailed

func (s *Stage[ProcessingData, ConvertedData]) GetCounterFailed() *expvar.Int

GetCounterFailed returns the `CounterFailed` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetCounterRunning

func (s *Stage[ProcessingData, ConvertedData]) GetCounterRunning() *expvar.Int

GetCounterRunning returns the `CounterRunning` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetCreatedAt added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) GetCreatedAt() time.Time

GetCreatedAt returns the created at time.

func (*Stage[ProcessingData, ConvertedData]) GetDescription

func (s *Stage[ProcessingData, ConvertedData]) GetDescription() string

GetDescription returns the `Description` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetDuration added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) GetDuration() *expvar.Int

GetDuration returns the `CounterDuration` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetLogger

func (s *Stage[ProcessingData, ConvertedData]) GetLogger() sypl.ISypl

GetLogger returns the `Logger` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetMetrics added in v2.0.7

func (s *Stage[ProcessingData, ConvertedData]) GetMetrics() map[string]string

GetMetrics returns the stage's metrics.

func (*Stage[ProcessingData, ConvertedData]) GetName

func (s *Stage[ProcessingData, ConvertedData]) GetName() string

GetName returns the `Name` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetOnFinished

func (s *Stage[ProcessingData, ConvertedData]) GetOnFinished() OnFinished[ProcessingData, ConvertedData]

GetOnFinished returns the `OnFinished` function.

func (*Stage[ProcessingData, ConvertedData]) GetProgress added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) GetProgress() *expvar.Int

GetProgress returns the `CounterProgress` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetProgressPercent added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) GetProgressPercent() *expvar.String

GetProgressPercent returns the `ProgressPercent` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetStatus

func (s *Stage[ProcessingData, ConvertedData]) GetStatus() *expvar.String

GetStatus returns the `Status` metric.

func (*Stage[ProcessingData, ConvertedData]) GetType

func (s *Stage[ProcessingData, ConvertedData]) GetType() string

GetType returns the entity type.

func (*Stage[ProcessingData, ConvertedData]) Run

func (s *Stage[ProcessingData, ConvertedData]) Run(ctx context.Context, tsk task.Task[ProcessingData, ConvertedData]) (task.Task[ProcessingData, ConvertedData], error)

Run the transform function.

func (*Stage[ProcessingData, ConvertedData]) SetOnFinished

func (s *Stage[ProcessingData, ConvertedData]) SetOnFinished(onFinished OnFinished[ProcessingData, ConvertedData])

SetOnFinished sets the `OnFinished` function.

func (*Stage[ProcessingData, ConvertedData]) SetProgressPercent added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) SetProgressPercent()

SetProgressPercent sets the `ProgressPercent` of the stage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL