Documentation ¶
Index ¶
- Variables
- func Execute(ctx context.Context, task *Task) error
- func GetDuration(task *Task) time.Duration
- type AsyncExecutor
- func (e *AsyncExecutor) CancelTask(taskID string) error
- func (e *AsyncExecutor) Close()
- func (e *AsyncExecutor) ExecuteAsync(ctx context.Context, task *Task) (<-chan AsyncResult, error)
- func (e *AsyncExecutor) GetResultChannel() <-chan AsyncResult
- func (e *AsyncExecutor) GetTaskChannels() *TaskChannels
- type AsyncResult
- type ContextManager
- func (c *ContextManager) ClearGlobalContext()
- func (c *ContextManager) ClearTaskContext(taskID string)
- func (c *ContextManager) ExportTaskContext(taskID string) (string, error)
- func (c *ContextManager) GetGlobalContext(key string) (interface{}, bool)
- func (c *ContextManager) GetTaskContext(taskID, key string) (interface{}, bool)
- func (c *ContextManager) ImportTaskContext(taskID string, jsonData string) error
- func (c *ContextManager) SetGlobalContext(key string, value interface{})
- func (c *ContextManager) SetTaskContext(taskID, key string, value interface{})
- func (c *ContextManager) ShareTaskContext(sourceTaskID, targetTaskID string, keys ...string) error
- type DynamicPriorityScorer
- type ErrorCategory
- type ErrorClassifier
- type ErrorWithMetadata
- type ExecutionMetrics
- func (m *ExecutionMetrics) Complete()
- func (m *ExecutionMetrics) GetDuration() time.Duration
- func (m *ExecutionMetrics) GetResourceUsage() map[string]float64
- func (m *ExecutionMetrics) IncrementErrors() uint32
- func (m *ExecutionMetrics) IncrementRetries() uint32
- func (m *ExecutionMetrics) SetStatus(status string)
- func (m *ExecutionMetrics) UpdateResourceUsage(resource string, usage float64)
- type MessageType
- type OutputStore
- func (s *OutputStore) DeleteOutput(taskID string) error
- func (s *OutputStore) GetOutput(taskID string) (*TaskOutput, error)
- func (s *OutputStore) ListOutputs() []*TaskOutput
- func (s *OutputStore) QueryOutputs(criteria map[string]interface{}) []*TaskOutput
- func (s *OutputStore) SaveOutput(output *TaskOutput) error
- type Pipeline
- func (p *Pipeline) AddCallback(callback PipelineCallback)
- func (p *Pipeline) AddTask(task *Task) error
- func (p *Pipeline) CancelTask(taskID string) error
- func (p *Pipeline) Close() error
- func (p *Pipeline) DeleteTaskOutput(taskID string) error
- func (p *Pipeline) Execute(ctx context.Context) error
- func (p *Pipeline) ExecuteAsync(ctx context.Context) (<-chan AsyncResult, error)
- func (p *Pipeline) ExecuteTask(ctx context.Context, task *Task) error
- func (p *Pipeline) ExecuteTaskAsync(ctx context.Context, task *Task) (<-chan AsyncResult, error)
- func (p *Pipeline) GetContextManager() *ContextManager
- func (p *Pipeline) GetGlobalContext(key string) (interface{}, bool)
- func (p *Pipeline) GetTaskContext(taskID, key string) (interface{}, bool)
- func (p *Pipeline) GetTaskOutput(taskID string) (*TaskOutput, error)
- func (p *Pipeline) GetTaskResult(taskID string) (string, bool)
- func (p *Pipeline) GetTaskStatus(taskID string) TaskStatus
- func (p *Pipeline) ListTaskOutputs() []*TaskOutput
- func (p *Pipeline) QueryTaskOutputs(criteria map[string]interface{}) []*TaskOutput
- func (p *Pipeline) SetGlobalContext(key string, value interface{})
- func (p *Pipeline) SetTaskContext(taskID, key string, value interface{})
- func (p *Pipeline) ShareContext(sourceTaskID, targetTaskID string, keys ...string) error
- type PipelineCallback
- type PriorityQueue
- func (pq *PriorityQueue) Clear()
- func (pq *PriorityQueue) Dequeue() (*Task, bool)
- func (pq *PriorityQueue) DequeueBatch(count int) []*Task
- func (pq *PriorityQueue) Enqueue(task *Task) error
- func (pq *PriorityQueue) EnqueueBatch(tasks []*Task) error
- func (pq *PriorityQueue) GetMetrics() map[string]interface{}
- func (pq *PriorityQueue) Len() int
- func (pq *PriorityQueue) Less(i, j int) bool
- func (pq *PriorityQueue) Peek() (*Task, bool)
- func (pq *PriorityQueue) Pop() interface{}
- func (pq *PriorityQueue) Push(x interface{})
- func (pq *PriorityQueue) Remove(taskID string) bool
- func (pq *PriorityQueue) SetScorer(scorer PriorityScorer)
- func (pq *PriorityQueue) Swap(i, j int)
- func (pq *PriorityQueue) UpdatePriority(taskID string) bool
- type PriorityScorer
- type QueueItem
- type QueueMetrics
- func (m *QueueMetrics) GetAverageWaitTime() time.Duration
- func (m *QueueMetrics) GetMetricsSnapshot() map[string]interface{}
- func (m *QueueMetrics) GetQueueLoad() float64
- func (m *QueueMetrics) Reset()
- func (m *QueueMetrics) TaskDequeued(waitTime time.Duration)
- func (m *QueueMetrics) TaskEnqueued(priority int)
- type RecoveryAttempt
- type RecoveryManager
- func (r *RecoveryManager) ClearRecoveryState(taskID string)
- func (r *RecoveryManager) GetDefaultOptions() *RecoveryOptions
- func (r *RecoveryManager) GetMetrics() *RecoveryMetrics
- func (r *RecoveryManager) GetRecoveryState(taskID string) (*RecoveryState, bool)
- func (r *RecoveryManager) HandleError(ctx context.Context, task *Task, err error, opts *RecoveryOptions) (RecoveryStrategy, error)
- func (r *RecoveryManager) RegisterErrorClassifier(fn func(error) (ErrorCategory, bool))
- func (r *RecoveryManager) SetDefaultOptions(opts *RecoveryOptions)
- type RecoveryMetrics
- func (rm *RecoveryMetrics) GetAverageRecoveryTime(strategy RecoveryStrategy) time.Duration
- func (rm *RecoveryMetrics) GetCategorySuccessRate(category ErrorCategory) float64
- func (rm *RecoveryMetrics) GetMetricsSummary() map[string]interface{}
- func (rm *RecoveryMetrics) GetSuccessRate(strategy RecoveryStrategy) float64
- func (rm *RecoveryMetrics) PruneHistory(maxAge time.Duration)
- func (rm *RecoveryMetrics) RecordAttempt(attempt *RecoveryAttempt)
- type RecoveryOptions
- type RecoveryState
- type RecoveryStrategy
- type RetryState
- type Task
- func (t *Task) AddMessage(msg string)
- func (t *Task) Cancel() error
- func (t *Task) Complete(result string) error
- func (t *Task) Duration() time.Duration
- func (t *Task) Fail(err error) error
- func (t *Task) GetError() error
- func (t *Task) GetResult() (string, error)
- func (t *Task) IsAsync() bool
- func (t *Task) IsCancelled() bool
- func (t *Task) IsComplete() bool
- func (t *Task) IsFailed() bool
- func (t *Task) IsFinished() bool
- func (t *Task) Start(agentID string) error
- func (t *Task) UpdateProgress(progress float64) error
- func (t *Task) WithAsyncExecution(async bool) *Task
- func (t *Task) WithDeadline(deadline time.Time) *Task
- func (t *Task) WithDependencies(deps []string) *Task
- func (t *Task) WithMetadata(metadata map[string]interface{}) *Task
- func (t *Task) WithPriority(priority TaskPriority) *Task
- func (t *Task) WithRequirements(reqs *capabilities.TaskRequirements) *Task
- func (t *Task) WithTimeout(timeout time.Duration) *Task
- type TaskChannels
- func (tc *TaskChannels) CloseChannels(taskID string)
- func (tc *TaskChannels) CreateChannels(taskID string)
- func (tc *TaskChannels) GetLogChannel(taskID string) (<-chan string, error)
- func (tc *TaskChannels) GetProgressChannel(taskID string) (<-chan float64, error)
- func (tc *TaskChannels) GetResultChannel(taskID string) (<-chan AsyncResult, error)
- func (tc *TaskChannels) GetStatusChannel(taskID string) (<-chan TaskStatus, error)
- func (tc *TaskChannels) SendLog(taskID string, message string) error
- func (tc *TaskChannels) SendProgress(taskID string, progress float64) error
- func (tc *TaskChannels) SendResult(taskID string, result AsyncResult) error
- func (tc *TaskChannels) SendStatus(taskID string, status TaskStatus) error
- func (tc *TaskChannels) Subscribe(taskID string) (<-chan TaskMessage, error)
- func (tc *TaskChannels) Unsubscribe(taskID string, ch <-chan TaskMessage)
- type TaskContext
- type TaskLifecycle
- func (l *TaskLifecycle) CancelTask(taskID string, reason string) error
- func (l *TaskLifecycle) Close()
- func (l *TaskLifecycle) ExtendTimeout(taskID string, extension time.Duration) error
- func (l *TaskLifecycle) GetContext(taskID string) (context.Context, error)
- func (l *TaskLifecycle) GetRetryState(taskID string) (*RetryState, error)
- func (l *TaskLifecycle) GetTimeRemaining(taskID string) (time.Duration, error)
- func (l *TaskLifecycle) IsExpired(taskID string) bool
- func (l *TaskLifecycle) RegisterTask(task *Task, timeout time.Duration, maxRetries int) error
- func (l *TaskLifecycle) ShouldRetry(taskID string, err error) (bool, time.Duration)
- func (l *TaskLifecycle) UnregisterTask(taskID string)
- type TaskMessage
- type TaskNode
- type TaskOption
- type TaskOutput
- type TaskPlanner
- func (p *TaskPlanner) AddTask(task *Task, dependencies []string, resources map[string]float64, ...) error
- func (p *TaskPlanner) DetectCycles() [][]string
- func (p *TaskPlanner) EstimateCosts() map[string]float64
- func (p *TaskPlanner) GetCriticalPath() []*Task
- func (p *TaskPlanner) OptimizeResources() error
- type TaskPriority
- type TaskProgress
- type TaskSchema
- type TaskStatus
- type TaskValidator
- func (v *TaskValidator) ValidateIOContract(taskID string, context interface{}, result string) []error
- func (v *TaskValidator) ValidatePerformance(metrics *ExecutionMetrics) []error
- func (v *TaskValidator) ValidateResourceUsage(resources map[string]float64) []error
- func (v *TaskValidator) ValidateSchema(task *Task) []error
- type WeightedPriorityScorer
Constants ¶
This section is empty.
Variables ¶
var ( // ErrQueueFull is returned when trying to add a task to a full queue ErrQueueFull = errors.New("priority queue is at maximum capacity") // ErrInvalidTask is returned when a task is nil or invalid ErrInvalidTask = errors.New("invalid task") // ErrTaskNotFound is returned when a task cannot be found in the queue ErrTaskNotFound = errors.New("task not found in queue") // ErrInvalidPriority is returned when an invalid priority value is provided ErrInvalidPriority = errors.New("invalid priority value") )
Functions ¶
func GetDuration ¶
GetDuration returns the duration of task execution
Types ¶
type AsyncExecutor ¶
type AsyncExecutor struct {
// contains filtered or unexported fields
}
AsyncExecutor handles asynchronous task execution
func NewAsyncExecutor ¶
func NewAsyncExecutor(pipeline *Pipeline, maxRetries int) *AsyncExecutor
NewAsyncExecutor creates a new async executor instance
func (*AsyncExecutor) CancelTask ¶
func (e *AsyncExecutor) CancelTask(taskID string) error
CancelTask cancels a running task
func (*AsyncExecutor) ExecuteAsync ¶
func (e *AsyncExecutor) ExecuteAsync(ctx context.Context, task *Task) (<-chan AsyncResult, error)
ExecuteAsync executes a task asynchronously
func (*AsyncExecutor) GetResultChannel ¶
func (e *AsyncExecutor) GetResultChannel() <-chan AsyncResult
GetResultChannel returns the channel for receiving task results
func (*AsyncExecutor) GetTaskChannels ¶
func (e *AsyncExecutor) GetTaskChannels() *TaskChannels
GetTaskChannels returns the task channels manager
type AsyncResult ¶
type AsyncResult struct { TaskID string Result string Error error StartTime time.Time EndTime time.Time RetryCount int }
AsyncResult represents the result of an async task execution
type ContextManager ¶
type ContextManager struct {
// contains filtered or unexported fields
}
ContextManager handles shared state and context between tasks
func NewContextManager ¶
func NewContextManager() *ContextManager
NewContextManager creates a new context manager instance
func (*ContextManager) ClearGlobalContext ¶
func (c *ContextManager) ClearGlobalContext()
ClearGlobalContext removes all global context data
func (*ContextManager) ClearTaskContext ¶
func (c *ContextManager) ClearTaskContext(taskID string)
ClearTaskContext removes all context data for a task
func (*ContextManager) ExportTaskContext ¶
func (c *ContextManager) ExportTaskContext(taskID string) (string, error)
ExportTaskContext exports a task's context as JSON
func (*ContextManager) GetGlobalContext ¶
func (c *ContextManager) GetGlobalContext(key string) (interface{}, bool)
GetGlobalContext gets a value from the global context
func (*ContextManager) GetTaskContext ¶
func (c *ContextManager) GetTaskContext(taskID, key string) (interface{}, bool)
GetTaskContext gets a value from a task's context
func (*ContextManager) ImportTaskContext ¶
func (c *ContextManager) ImportTaskContext(taskID string, jsonData string) error
ImportTaskContext imports JSON data into a task's context
func (*ContextManager) SetGlobalContext ¶
func (c *ContextManager) SetGlobalContext(key string, value interface{})
SetGlobalContext sets a value in the global context
func (*ContextManager) SetTaskContext ¶
func (c *ContextManager) SetTaskContext(taskID, key string, value interface{})
SetTaskContext sets a value in a task's context
func (*ContextManager) ShareTaskContext ¶
func (c *ContextManager) ShareTaskContext(sourceTaskID, targetTaskID string, keys ...string) error
ShareTaskContext shares context data between tasks
type DynamicPriorityScorer ¶
type DynamicPriorityScorer struct {
// contains filtered or unexported fields
}
DynamicPriorityScorer adjusts priority based on queue metrics
func NewDynamicPriorityScorer ¶
func NewDynamicPriorityScorer(baseScorer PriorityScorer) *DynamicPriorityScorer
NewDynamicPriorityScorer creates a new dynamic priority scorer
func (*DynamicPriorityScorer) CalculatePriority ¶
func (d *DynamicPriorityScorer) CalculatePriority(task *Task, context TaskContext) float64
CalculatePriority implements PriorityScorer interface with dynamic adjustments
type ErrorCategory ¶
type ErrorCategory string
ErrorCategory represents the broad category of an error
const ( // TemporaryError represents transient errors that may resolve on retry TemporaryError ErrorCategory = "temporary" // ResourceError represents errors due to resource constraints ResourceError ErrorCategory = "resource" // TimeoutError represents deadline or timeout errors TimeoutError ErrorCategory = "timeout" // ValidationError represents errors in task input/output validation ValidationError ErrorCategory = "validation" // PermissionError represents authorization/access errors PermissionError ErrorCategory = "permission" // SystemError represents internal system errors SystemError ErrorCategory = "system" // UnknownError represents unclassified errors UnknownError ErrorCategory = "unknown" )
type ErrorClassifier ¶
type ErrorClassifier struct {
// contains filtered or unexported fields
}
ErrorClassifier provides error classification functionality
func NewErrorClassifier ¶
func NewErrorClassifier() *ErrorClassifier
NewErrorClassifier creates a new error classifier
func (*ErrorClassifier) Classify ¶
func (ec *ErrorClassifier) Classify(err error) ErrorCategory
Classify determines the category of an error
func (*ErrorClassifier) RegisterClassifier ¶
func (ec *ErrorClassifier) RegisterClassifier(fn func(error) (ErrorCategory, bool))
RegisterClassifier adds a custom error classifier
type ErrorWithMetadata ¶
type ErrorWithMetadata struct { Err error Category ErrorCategory Metadata map[string]interface{} Time time.Time }
ErrorWithMetadata adds context and metadata to an error
func NewErrorWithMetadata ¶
func NewErrorWithMetadata(err error, category ErrorCategory, metadata map[string]interface{}) *ErrorWithMetadata
NewErrorWithMetadata creates a new error with metadata
func (*ErrorWithMetadata) Error ¶
func (e *ErrorWithMetadata) Error() string
Error implements the error interface
func (*ErrorWithMetadata) Unwrap ¶
func (e *ErrorWithMetadata) Unwrap() error
Unwrap returns the underlying error
type ExecutionMetrics ¶
type ExecutionMetrics struct { TaskID string StartTime time.Time EndTime time.Time Duration time.Duration CPUUsage float64 MemoryUsage float64 NetworkUsage float64 ErrorCount uint32 RetryCount uint32 Status string ResourceUsage map[string]float64 }
ExecutionMetrics tracks task execution performance metrics
func NewExecutionMetrics ¶
func NewExecutionMetrics(taskID string) *ExecutionMetrics
NewExecutionMetrics creates a new metrics tracker for a task
func (*ExecutionMetrics) Complete ¶
func (m *ExecutionMetrics) Complete()
Complete marks the task as complete and calculates final metrics
func (*ExecutionMetrics) GetDuration ¶
func (m *ExecutionMetrics) GetDuration() time.Duration
GetDuration returns the task duration
func (*ExecutionMetrics) GetResourceUsage ¶
func (m *ExecutionMetrics) GetResourceUsage() map[string]float64
GetResourceUsage gets the current resource usage
func (*ExecutionMetrics) IncrementErrors ¶
func (m *ExecutionMetrics) IncrementErrors() uint32
IncrementErrors increments the error counter
func (*ExecutionMetrics) IncrementRetries ¶
func (m *ExecutionMetrics) IncrementRetries() uint32
IncrementRetries increments the retry counter
func (*ExecutionMetrics) SetStatus ¶
func (m *ExecutionMetrics) SetStatus(status string)
SetStatus updates the task status
func (*ExecutionMetrics) UpdateResourceUsage ¶
func (m *ExecutionMetrics) UpdateResourceUsage(resource string, usage float64)
UpdateResourceUsage updates resource usage metrics
type MessageType ¶
type MessageType int
MessageType represents different types of task messages
const ( // MessageTypeResult represents a task result MessageTypeResult MessageType = iota // MessageTypeProgress represents task progress update MessageTypeProgress // MessageTypeLog represents a log message MessageTypeLog // MessageTypeError represents an error message MessageTypeError // MessageTypeStatus represents a status update MessageTypeStatus )
type OutputStore ¶
type OutputStore struct {
// contains filtered or unexported fields
}
OutputStore manages task output persistence
func NewOutputStore ¶
func NewOutputStore(baseDir string) (*OutputStore, error)
NewOutputStore creates a new output store
func (*OutputStore) DeleteOutput ¶
func (s *OutputStore) DeleteOutput(taskID string) error
DeleteOutput removes a task output
func (*OutputStore) GetOutput ¶
func (s *OutputStore) GetOutput(taskID string) (*TaskOutput, error)
GetOutput retrieves a task output
func (*OutputStore) ListOutputs ¶
func (s *OutputStore) ListOutputs() []*TaskOutput
ListOutputs returns all stored task outputs
func (*OutputStore) QueryOutputs ¶
func (s *OutputStore) QueryOutputs(criteria map[string]interface{}) []*TaskOutput
QueryOutputs searches for outputs matching criteria
func (*OutputStore) SaveOutput ¶
func (s *OutputStore) SaveOutput(output *TaskOutput) error
SaveOutput saves a task output
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline manages the execution flow of tasks
func NewPipeline ¶
NewPipeline creates a new task execution pipeline
func (*Pipeline) AddCallback ¶
func (p *Pipeline) AddCallback(callback PipelineCallback)
AddCallback adds a callback for task status changes
func (*Pipeline) CancelTask ¶
CancelTask cancels a running task
func (*Pipeline) DeleteTaskOutput ¶
DeleteTaskOutput removes a task's output from storage
func (*Pipeline) ExecuteAsync ¶
func (p *Pipeline) ExecuteAsync(ctx context.Context) (<-chan AsyncResult, error)
ExecuteAsync runs all tasks in the pipeline asynchronously
func (*Pipeline) ExecuteTask ¶
ExecuteTask executes a single task with validation and metrics
func (*Pipeline) ExecuteTaskAsync ¶
ExecuteTaskAsync executes a single task asynchronously
func (*Pipeline) GetContextManager ¶
func (p *Pipeline) GetContextManager() *ContextManager
GetContextManager returns the pipeline's context manager
func (*Pipeline) GetGlobalContext ¶
GetGlobalContext gets a value from the global context
func (*Pipeline) GetTaskContext ¶
GetTaskContext gets a value from a task's context
func (*Pipeline) GetTaskOutput ¶
func (p *Pipeline) GetTaskOutput(taskID string) (*TaskOutput, error)
GetTaskOutput retrieves a task's output from storage
func (*Pipeline) GetTaskResult ¶
GetTaskResult returns the result of a completed task
func (*Pipeline) GetTaskStatus ¶
func (p *Pipeline) GetTaskStatus(taskID string) TaskStatus
GetTaskStatus returns the current status of a task
func (*Pipeline) ListTaskOutputs ¶
func (p *Pipeline) ListTaskOutputs() []*TaskOutput
ListTaskOutputs returns all stored task outputs
func (*Pipeline) QueryTaskOutputs ¶
func (p *Pipeline) QueryTaskOutputs(criteria map[string]interface{}) []*TaskOutput
QueryTaskOutputs searches for outputs matching the given criteria
func (*Pipeline) SetGlobalContext ¶
SetGlobalContext sets a value in the global context
func (*Pipeline) SetTaskContext ¶
SetTaskContext sets a value in a task's context
type PipelineCallback ¶
type PipelineCallback func(task *Task, status TaskStatus, result string)
PipelineCallback is called when task status changes
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue implements a thread-safe priority queue for tasks
func NewPriorityQueue ¶
func NewPriorityQueue(maxCapacity int) *PriorityQueue
NewPriorityQueue creates a new priority queue instance
func (*PriorityQueue) Clear ¶
func (pq *PriorityQueue) Clear()
Clear removes all items from the queue
func (*PriorityQueue) Dequeue ¶
func (pq *PriorityQueue) Dequeue() (*Task, bool)
Dequeue removes and returns the highest priority task
func (*PriorityQueue) DequeueBatch ¶
func (pq *PriorityQueue) DequeueBatch(count int) []*Task
DequeueBatch removes and returns multiple high priority tasks
func (*PriorityQueue) Enqueue ¶
func (pq *PriorityQueue) Enqueue(task *Task) error
Enqueue adds a task to the queue
func (*PriorityQueue) EnqueueBatch ¶
func (pq *PriorityQueue) EnqueueBatch(tasks []*Task) error
EnqueueBatch adds multiple tasks to the queue
func (*PriorityQueue) GetMetrics ¶
func (pq *PriorityQueue) GetMetrics() map[string]interface{}
GetMetrics returns the queue metrics
func (*PriorityQueue) Len ¶
func (pq *PriorityQueue) Len() int
Len returns the number of items in the queue
func (*PriorityQueue) Less ¶
func (pq *PriorityQueue) Less(i, j int) bool
Less compares two items for priority ordering
func (*PriorityQueue) Peek ¶
func (pq *PriorityQueue) Peek() (*Task, bool)
Peek returns the highest priority task without removing it
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
Pop removes and returns the highest priority item
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
Push adds an item to the queue
func (*PriorityQueue) Remove ¶
func (pq *PriorityQueue) Remove(taskID string) bool
Remove removes a specific task from the queue
func (*PriorityQueue) SetScorer ¶
func (pq *PriorityQueue) SetScorer(scorer PriorityScorer)
SetScorer sets a custom priority scorer
func (*PriorityQueue) Swap ¶
func (pq *PriorityQueue) Swap(i, j int)
Swap swaps two items in the queue
func (*PriorityQueue) UpdatePriority ¶
func (pq *PriorityQueue) UpdatePriority(taskID string) bool
UpdatePriority updates the priority of a task
type PriorityScorer ¶
type PriorityScorer interface {
CalculatePriority(task *Task, context TaskContext) float64
}
PriorityScorer defines how task priority is calculated
type QueueItem ¶
type QueueItem struct { Task *Task Priority float64 Deadline time.Time Index int // Used by heap.Interface InsertedAt time.Time }
QueueItem represents an item in the priority queue
type QueueMetrics ¶
type QueueMetrics struct {
// contains filtered or unexported fields
}
QueueMetrics tracks performance and statistics of the priority queue
func NewQueueMetrics ¶
func NewQueueMetrics() *QueueMetrics
NewQueueMetrics creates a new queue metrics instance
func (*QueueMetrics) GetAverageWaitTime ¶
func (m *QueueMetrics) GetAverageWaitTime() time.Duration
GetAverageWaitTime returns the average wait time for tasks
func (*QueueMetrics) GetMetricsSnapshot ¶
func (m *QueueMetrics) GetMetricsSnapshot() map[string]interface{}
GetMetricsSnapshot returns a snapshot of current metrics
func (*QueueMetrics) GetQueueLoad ¶
func (m *QueueMetrics) GetQueueLoad() float64
GetQueueLoad returns the current queue load (0.0 to 1.0)
func (*QueueMetrics) Reset ¶
func (m *QueueMetrics) Reset()
Reset resets all metrics to initial values
func (*QueueMetrics) TaskDequeued ¶
func (m *QueueMetrics) TaskDequeued(waitTime time.Duration)
TaskDequeued records metrics when a task is removed from the queue
func (*QueueMetrics) TaskEnqueued ¶
func (m *QueueMetrics) TaskEnqueued(priority int)
TaskEnqueued records metrics when a task is added to the queue
type RecoveryAttempt ¶
type RecoveryAttempt struct { TaskID string Strategy RecoveryStrategy ErrorCategory ErrorCategory StartTime time.Time EndTime time.Time Duration time.Duration Successful bool ErrorDetails string AttemptNumber int BackoffDuration time.Duration MetadataSnapshot map[string]interface{} }
RecoveryAttempt represents a single recovery attempt
type RecoveryManager ¶
type RecoveryManager struct {
// contains filtered or unexported fields
}
RecoveryManager handles task recovery and error handling
func NewRecoveryManager ¶
func NewRecoveryManager() *RecoveryManager
NewRecoveryManager creates a new recovery manager instance
func (*RecoveryManager) ClearRecoveryState ¶
func (r *RecoveryManager) ClearRecoveryState(taskID string)
ClearRecoveryState removes recovery state for a task
func (*RecoveryManager) GetDefaultOptions ¶
func (r *RecoveryManager) GetDefaultOptions() *RecoveryOptions
GetDefaultOptions returns the current default recovery options
func (*RecoveryManager) GetMetrics ¶
func (r *RecoveryManager) GetMetrics() *RecoveryMetrics
GetMetrics returns the recovery metrics
func (*RecoveryManager) GetRecoveryState ¶
func (r *RecoveryManager) GetRecoveryState(taskID string) (*RecoveryState, bool)
GetRecoveryState returns the current recovery state for a task
func (*RecoveryManager) HandleError ¶
func (r *RecoveryManager) HandleError(ctx context.Context, task *Task, err error, opts *RecoveryOptions) (RecoveryStrategy, error)
HandleError determines and executes the appropriate recovery strategy
func (*RecoveryManager) RegisterErrorClassifier ¶
func (r *RecoveryManager) RegisterErrorClassifier(fn func(error) (ErrorCategory, bool))
RegisterErrorClassifier adds a custom error classifier
func (*RecoveryManager) SetDefaultOptions ¶
func (r *RecoveryManager) SetDefaultOptions(opts *RecoveryOptions)
SetDefaultOptions updates the default recovery options
type RecoveryMetrics ¶
type RecoveryMetrics struct { // Total attempts per strategy AttemptsByStrategy map[RecoveryStrategy]int // Successful recoveries per strategy SuccessesByStrategy map[RecoveryStrategy]int // Failed recoveries per strategy FailuresByStrategy map[RecoveryStrategy]int // Attempts by error category AttemptsByCategory map[ErrorCategory]int // Successes by error category SuccessesByCategory map[ErrorCategory]int // Failures by error category FailuresByCategory map[ErrorCategory]int // Average recovery time per strategy AvgRecoveryTime map[RecoveryStrategy]time.Duration // Total recovery time per strategy TotalRecoveryTime map[RecoveryStrategy]time.Duration // Number of timing samples per strategy TimingSamples map[RecoveryStrategy]int // Historical recovery attempts History []*RecoveryAttempt // contains filtered or unexported fields }
RecoveryMetrics tracks statistics about error recovery attempts
func NewRecoveryMetrics ¶
func NewRecoveryMetrics() *RecoveryMetrics
NewRecoveryMetrics creates a new recovery metrics tracker
func (*RecoveryMetrics) GetAverageRecoveryTime ¶
func (rm *RecoveryMetrics) GetAverageRecoveryTime(strategy RecoveryStrategy) time.Duration
GetAverageRecoveryTime returns the average recovery time for a strategy
func (*RecoveryMetrics) GetCategorySuccessRate ¶
func (rm *RecoveryMetrics) GetCategorySuccessRate(category ErrorCategory) float64
GetCategorySuccessRate returns the success rate for an error category
func (*RecoveryMetrics) GetMetricsSummary ¶
func (rm *RecoveryMetrics) GetMetricsSummary() map[string]interface{}
GetMetricsSummary returns a summary of all metrics
func (*RecoveryMetrics) GetSuccessRate ¶
func (rm *RecoveryMetrics) GetSuccessRate(strategy RecoveryStrategy) float64
GetSuccessRate returns the success rate for a strategy
func (*RecoveryMetrics) PruneHistory ¶
func (rm *RecoveryMetrics) PruneHistory(maxAge time.Duration)
PruneHistory removes old history entries beyond a certain age
func (*RecoveryMetrics) RecordAttempt ¶
func (rm *RecoveryMetrics) RecordAttempt(attempt *RecoveryAttempt)
RecordAttempt records a recovery attempt
type RecoveryOptions ¶
type RecoveryOptions struct { Strategy RecoveryStrategy MaxRetries int InitialBackoff time.Duration MaxBackoff time.Duration TimeoutExtension time.Duration FallbackFunc func(context.Context) (string, error) }
RecoveryOptions configures how recovery should be handled
type RecoveryState ¶
type RecoveryState struct { TaskID string Strategy RecoveryStrategy Attempts int LastError error LastAttempt time.Time NextAttempt time.Time BackoffDelay time.Duration FallbackUsed bool }
RecoveryState tracks the state of a recovery attempt
type RecoveryStrategy ¶
type RecoveryStrategy int
RecoveryStrategy defines how to handle task failures
const ( // RetryWithBackoff retries the task with exponential backoff RetryWithBackoff RecoveryStrategy = iota // RetryWithTimeout retries the task with a new timeout RetryWithTimeout // RetryWithFallback retries the task with fallback options RetryWithFallback // AbortExecution stops execution and reports failure AbortExecution )
type RetryState ¶
type RetryState struct { Attempts int MaxAttempts int LastAttempt time.Time NextAttempt time.Time BackoffDelay time.Duration }
RetryState tracks retry-related information for a task
type Task ¶
type Task struct { // Core fields ID string Description string ExpectedOutput string Status TaskStatus Result string Error error ExecuteFunc func(context.Context) (string, error) Context string Timeout time.Duration // Assignment and delegation AssignedAgent string ParentID string AsyncExecution bool // Timing CreatedAt time.Time StartedAt *time.Time CompletedAt *time.Time // Progress tracking Progress float64 Messages []string // Additional data Metadata map[string]interface{} // Priority handling fields Priority TaskPriority Deadline time.Time Dependencies []string // IDs of tasks that must complete before this one PriorityScore float64 // Task requirements Requirements *capabilities.TaskRequirements }
Task represents a unit of work to be executed by an agent
func (*Task) IsCancelled ¶
IsCancelled checks if task was cancelled
func (*Task) IsFinished ¶
IsFinished checks if task is in a terminal state
func (*Task) UpdateProgress ¶
UpdateProgress updates task progress
func (*Task) WithAsyncExecution ¶
WithAsyncExecution sets async execution flag
func (*Task) WithDeadline ¶
WithDeadline sets the task deadline
func (*Task) WithDependencies ¶
WithDependencies sets task dependencies
func (*Task) WithMetadata ¶
WithMetadata adds metadata to the task
func (*Task) WithPriority ¶
func (t *Task) WithPriority(priority TaskPriority) *Task
WithPriority sets the task priority
func (*Task) WithRequirements ¶
func (t *Task) WithRequirements(reqs *capabilities.TaskRequirements) *Task
WithRequirements sets task requirements
type TaskChannels ¶
type TaskChannels struct {
// contains filtered or unexported fields
}
TaskChannels manages communication channels for tasks
func NewTaskChannels ¶
func NewTaskChannels() *TaskChannels
NewTaskChannels creates a new task channels manager
func (*TaskChannels) CloseChannels ¶
func (tc *TaskChannels) CloseChannels(taskID string)
CloseChannels closes all channels for a task
func (*TaskChannels) CreateChannels ¶
func (tc *TaskChannels) CreateChannels(taskID string)
CreateChannels creates all necessary channels for a task
func (*TaskChannels) GetLogChannel ¶
func (tc *TaskChannels) GetLogChannel(taskID string) (<-chan string, error)
GetLogChannel returns the log channel for a task
func (*TaskChannels) GetProgressChannel ¶
func (tc *TaskChannels) GetProgressChannel(taskID string) (<-chan float64, error)
GetProgressChannel returns the progress channel for a task
func (*TaskChannels) GetResultChannel ¶
func (tc *TaskChannels) GetResultChannel(taskID string) (<-chan AsyncResult, error)
GetResultChannel returns the result channel for a task
func (*TaskChannels) GetStatusChannel ¶
func (tc *TaskChannels) GetStatusChannel(taskID string) (<-chan TaskStatus, error)
GetStatusChannel returns the status channel for a task
func (*TaskChannels) SendLog ¶
func (tc *TaskChannels) SendLog(taskID string, message string) error
SendLog sends a log message
func (*TaskChannels) SendProgress ¶
func (tc *TaskChannels) SendProgress(taskID string, progress float64) error
SendProgress sends a progress update
func (*TaskChannels) SendResult ¶
func (tc *TaskChannels) SendResult(taskID string, result AsyncResult) error
SendResult sends a task result
func (*TaskChannels) SendStatus ¶
func (tc *TaskChannels) SendStatus(taskID string, status TaskStatus) error
SendStatus sends a status update
func (*TaskChannels) Subscribe ¶
func (tc *TaskChannels) Subscribe(taskID string) (<-chan TaskMessage, error)
Subscribe creates a new subscription for task messages
func (*TaskChannels) Unsubscribe ¶
func (tc *TaskChannels) Unsubscribe(taskID string, ch <-chan TaskMessage)
Unsubscribe removes a subscription
type TaskContext ¶
type TaskContext struct { WaitingTime time.Duration Dependencies []*Task QueueMetrics *QueueMetrics CreatedAt time.Time }
TaskContext provides context for priority calculation
type TaskLifecycle ¶
type TaskLifecycle struct {
// contains filtered or unexported fields
}
TaskLifecycle manages task lifecycle events and states
func NewTaskLifecycle ¶
func NewTaskLifecycle() *TaskLifecycle
NewTaskLifecycle creates a new task lifecycle manager
func (*TaskLifecycle) CancelTask ¶
func (l *TaskLifecycle) CancelTask(taskID string, reason string) error
CancelTask cancels a task and its context
func (*TaskLifecycle) ExtendTimeout ¶
func (l *TaskLifecycle) ExtendTimeout(taskID string, extension time.Duration) error
ExtendTimeout extends the timeout for a task
func (*TaskLifecycle) GetContext ¶
func (l *TaskLifecycle) GetContext(taskID string) (context.Context, error)
GetContext returns the context for a task
func (*TaskLifecycle) GetRetryState ¶
func (l *TaskLifecycle) GetRetryState(taskID string) (*RetryState, error)
GetRetryState returns the retry state for a task
func (*TaskLifecycle) GetTimeRemaining ¶
func (l *TaskLifecycle) GetTimeRemaining(taskID string) (time.Duration, error)
GetTimeRemaining returns the remaining time for a task
func (*TaskLifecycle) IsExpired ¶
func (l *TaskLifecycle) IsExpired(taskID string) bool
IsExpired checks if a task has expired (exceeded its timeout)
func (*TaskLifecycle) RegisterTask ¶
RegisterTask registers a task for lifecycle management
func (*TaskLifecycle) ShouldRetry ¶
ShouldRetry determines if a task should be retried
func (*TaskLifecycle) UnregisterTask ¶
func (l *TaskLifecycle) UnregisterTask(taskID string)
UnregisterTask removes a task from lifecycle management
type TaskMessage ¶
type TaskMessage struct { Type MessageType TaskID string Content interface{} Timestamp time.Time }
TaskMessage represents a message in task communication
type TaskNode ¶
type TaskNode struct { Task *Task Duration time.Duration Resources map[string]float64 Dependencies []*TaskNode Dependents []*TaskNode EarliestStart time.Time LatestStart time.Time Critical bool }
TaskNode represents a node in the task dependency graph
type TaskOption ¶
type TaskOption func(*Task)
TaskOption represents a function that modifies a task
func WithContext ¶
func WithContext(context string) TaskOption
WithContext sets the context for task execution
func WithExpectedOutput ¶
func WithExpectedOutput(output string) TaskOption
WithExpectedOutput sets the expected output for task validation
func WithTimeout ¶
func WithTimeout(timeout time.Duration) TaskOption
WithTimeout sets a timeout for task execution
type TaskOutput ¶
type TaskOutput struct { TaskID string `json:"task_id"` Description string `json:"description"` Status TaskStatus `json:"status"` Result string `json:"result"` Context map[string]interface{} `json:"context"` Metadata map[string]interface{} `json:"metadata"` StartedAt *time.Time `json:"started_at,omitempty"` CompletedAt *time.Time `json:"completed_at,omitempty"` Error string `json:"error,omitempty"` Duration time.Duration `json:"duration,omitempty"` }
TaskOutput represents the complete output of a task execution
type TaskPlanner ¶
type TaskPlanner struct {
// contains filtered or unexported fields
}
TaskPlanner handles task scheduling and resource allocation
func NewTaskPlanner ¶
func NewTaskPlanner(resourceLimits map[string]float64) *TaskPlanner
NewTaskPlanner creates a new task planner
func (*TaskPlanner) AddTask ¶
func (p *TaskPlanner) AddTask(task *Task, dependencies []string, resources map[string]float64, duration time.Duration) error
AddTask adds a task to the planner with its dependencies and resource requirements
func (*TaskPlanner) DetectCycles ¶
func (p *TaskPlanner) DetectCycles() [][]string
DetectCycles checks for dependency cycles in the task graph
func (*TaskPlanner) EstimateCosts ¶
func (p *TaskPlanner) EstimateCosts() map[string]float64
EstimateCosts estimates execution costs for tasks
func (*TaskPlanner) GetCriticalPath ¶
func (p *TaskPlanner) GetCriticalPath() []*Task
GetCriticalPath identifies the critical path in the task graph
func (*TaskPlanner) OptimizeResources ¶
func (p *TaskPlanner) OptimizeResources() error
OptimizeResources optimizes resource allocation across tasks
type TaskPriority ¶
type TaskPriority int
TaskPriority represents the priority level of a task
const ( PriorityLowest TaskPriority = -2 PriorityLow TaskPriority = -1 PriorityNormal TaskPriority = 0 PriorityHigh TaskPriority = 1 PriorityHighest TaskPriority = 2 )
type TaskProgress ¶
type TaskProgress struct { // TaskID uniquely identifies the task TaskID string // AgentID identifies the agent executing the task AgentID string // Status indicates the current state Status TaskStatus // Progress is a number between 0 and 1 Progress float64 // Message provides additional context about the current state Message string // Error holds any error that occurred Error error // StartTime is when the task began StartTime time.Time // EndTime is when the task completed/failed EndTime *time.Time // Requirements specifies what the task needs Requirements *capabilities.TaskRequirements // Subtasks tracks progress of component tasks Subtasks []*TaskProgress // Metadata holds additional task-specific data Metadata map[string]interface{} }
TaskProgress represents the progress of a task
type TaskSchema ¶
type TaskSchema struct { RequiredFields []string AllowedPriorities []TaskPriority MaxDescriptionLen int MaxTimeout time.Duration ResourceLimits map[string]float64 }
TaskSchema defines validation rules for tasks
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the current state of a task
const ( TaskStatusPending TaskStatus = "pending" TaskStatusRunning TaskStatus = "running" TaskStatusComplete TaskStatus = "complete" TaskStatusFailed TaskStatus = "failed" TaskStatusCancelled TaskStatus = "cancelled" // TaskStatusBlocked indicates task is blocked by dependencies TaskStatusBlocked TaskStatus = "blocked" // TaskStatusTimeout indicates task exceeded its timeout TaskStatusTimeout TaskStatus = "timeout" // TaskStatusRetrying indicates task is being retried after failure TaskStatusRetrying TaskStatus = "retrying" )
func GetStatus ¶
func GetStatus(task *Task) TaskStatus
GetStatus returns the current status of the task
type TaskValidator ¶
type TaskValidator struct {
// contains filtered or unexported fields
}
TaskValidator handles task validation
func NewTaskValidator ¶
func NewTaskValidator(schema *TaskSchema) *TaskValidator
NewTaskValidator creates a new task validator
func (*TaskValidator) ValidateIOContract ¶
func (v *TaskValidator) ValidateIOContract(taskID string, context interface{}, result string) []error
ValidateIOContract checks if task input/output meets requirements
func (*TaskValidator) ValidatePerformance ¶
func (v *TaskValidator) ValidatePerformance(metrics *ExecutionMetrics) []error
ValidatePerformance checks if task performance meets requirements
func (*TaskValidator) ValidateResourceUsage ¶
func (v *TaskValidator) ValidateResourceUsage(resources map[string]float64) []error
ValidateResourceUsage checks if resource usage is within limits
func (*TaskValidator) ValidateSchema ¶
func (v *TaskValidator) ValidateSchema(task *Task) []error
ValidateSchema checks if a task meets schema requirements
type WeightedPriorityScorer ¶
type WeightedPriorityScorer struct { PriorityWeight float64 DeadlineWeight float64 WaitTimeWeight float64 DependencyWeight float64 }
WeightedPriorityScorer implements weighted priority calculation
func NewWeightedPriorityScorer ¶
func NewWeightedPriorityScorer() *WeightedPriorityScorer
NewWeightedPriorityScorer creates a new weighted priority scorer with default weights
func (*WeightedPriorityScorer) CalculatePriority ¶
func (w *WeightedPriorityScorer) CalculatePriority(task *Task, context TaskContext) float64
CalculatePriority implements PriorityScorer interface