pipeline

package
v0.0.0-...-17e6b21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DistributedPipeline

type DistributedPipeline struct {
	// contains filtered or unexported fields
}

func NewDistributedPipeline

func NewDistributedPipeline(
	metrics *metrics.QuantumStorageMetrics,
	tracer tracing.Tracer,
	natsConn *nats.Conn,
) *DistributedPipeline

func (*DistributedPipeline) AddStage

func (dp *DistributedPipeline) AddStage(stage Stage) error

func (*DistributedPipeline) ProcessTask

func (dp *DistributedPipeline) ProcessTask(
	ctx context.Context,
	task *Task,
) (*Result, error)

type PipelineMetrics

type PipelineMetrics struct {
	ActiveTasks    int
	CompletedTasks int
	FailedTasks    int
	AverageLatency time.Duration
	ResourceUsage  ResourceMetrics
	StageMetrics   map[string]StageMetrics
}

type ProcessingPipeline

type ProcessingPipeline struct {
	// contains filtered or unexported fields
}

func NewProcessingPipeline

func NewProcessingPipeline(
	metrics *metrics.QuantumStorageMetrics,
	tracer tracing.Tracer,
	workerCount int,
) *ProcessingPipeline

func (*ProcessingPipeline) AddStage

func (pp *ProcessingPipeline) AddStage(stage Stage) error

func (*ProcessingPipeline) GetPipelineMetrics

func (pp *ProcessingPipeline) GetPipelineMetrics(
	ctx context.Context,
) (*PipelineMetrics, error)

func (*ProcessingPipeline) ProcessTask

func (pp *ProcessingPipeline) ProcessTask(
	ctx context.Context,
	task *ProcessingTask,
) (*ProcessingResult, error)

func (*ProcessingPipeline) ScheduleTask

func (pp *ProcessingPipeline) ScheduleTask(
	ctx context.Context,
	task *ProcessingTask,
	priority TaskPriority,
) (*ScheduledTask, error)

type ProcessingResult

type ProcessingResult struct {
	TaskID      string
	StageID     string
	Output      *unified.Node
	Status      ProcessingStatus
	Error       error
	Metrics     map[string]float64
	CompletedAt time.Time
}

type ProcessingStatus

type ProcessingStatus string
const (
	StatusPending   ProcessingStatus = "PENDING"
	StatusRunning   ProcessingStatus = "RUNNING"
	StatusCompleted ProcessingStatus = "COMPLETED"
	StatusFailed    ProcessingStatus = "FAILED"
)

type ProcessingTask

type ProcessingTask struct {
	ID         string
	StageID    string
	Input      *unified.Node
	Metadata   map[string]interface{}
	Priority   int
	Deadline   time.Time
	RetryCount int
}

type Processor

type Processor interface {
	Process(context.Context, *ProcessingTask) (*ProcessingResult, error)
	Validate(context.Context, *ProcessingTask) error
	GetCapabilities() []string
}

type Result

type Result struct {
	TaskID      string
	StageID     string
	Output      []byte
	Status      Status
	Error       error
	Metrics     map[string]float64
	CompletedAt time.Time
}

type RetryPolicy

type RetryPolicy struct {
	MaxRetries  int
	BackoffBase time.Duration
	MaxBackoff  time.Duration
}

type Stage

type Stage struct {
	ID          string
	Name        string
	Processor   Processor
	DependsOn   []string
	Timeout     time.Duration
	RetryPolicy RetryPolicy
}

type Task

type Task struct {
	ID         string
	StageID    string
	Input      []byte
	Metadata   map[string]interface{}
	Priority   int
	Deadline   time.Time
	RetryCount int
}

type TaskPriority

type TaskPriority int
const (
	PriorityLow      TaskPriority = 1
	PriorityNormal   TaskPriority = 2
	PriorityHigh     TaskPriority = 3
	PriorityCritical TaskPriority = 4
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL