tasks

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2025 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrQueueFull is returned when trying to add a task to a full queue
	ErrQueueFull = errors.New("priority queue is at maximum capacity")

	// ErrInvalidTask is returned when a task is nil or invalid
	ErrInvalidTask = errors.New("invalid task")

	// ErrTaskNotFound is returned when a task cannot be found in the queue
	ErrTaskNotFound = errors.New("task not found in queue")

	// ErrInvalidPriority is returned when an invalid priority value is provided
	ErrInvalidPriority = errors.New("invalid priority value")
)

Functions

func Execute

func Execute(ctx context.Context, task *Task) error

Execute executes the task using the execute function

func GetDuration

func GetDuration(task *Task) time.Duration

GetDuration returns the duration of task execution

Types

type AsyncExecutor

type AsyncExecutor struct {
	// contains filtered or unexported fields
}

AsyncExecutor handles asynchronous task execution

func NewAsyncExecutor

func NewAsyncExecutor(pipeline *Pipeline, maxRetries int) *AsyncExecutor

NewAsyncExecutor creates a new async executor instance

func (*AsyncExecutor) CancelTask

func (e *AsyncExecutor) CancelTask(taskID string) error

CancelTask cancels a running task

func (*AsyncExecutor) Close

func (e *AsyncExecutor) Close()

Close cleans up resources

func (*AsyncExecutor) ExecuteAsync

func (e *AsyncExecutor) ExecuteAsync(ctx context.Context, task *Task) (<-chan AsyncResult, error)

ExecuteAsync executes a task asynchronously

func (*AsyncExecutor) GetResultChannel

func (e *AsyncExecutor) GetResultChannel() <-chan AsyncResult

GetResultChannel returns the channel for receiving task results

func (*AsyncExecutor) GetTaskChannels

func (e *AsyncExecutor) GetTaskChannels() *TaskChannels

GetTaskChannels returns the task channels manager

type AsyncResult

type AsyncResult struct {
	TaskID     string
	Result     string
	Error      error
	StartTime  time.Time
	EndTime    time.Time
	RetryCount int
}

AsyncResult represents the result of an async task execution

type ContextManager

type ContextManager struct {
	// contains filtered or unexported fields
}

ContextManager handles shared state and context between tasks

func NewContextManager

func NewContextManager() *ContextManager

NewContextManager creates a new context manager instance

func (*ContextManager) ClearGlobalContext

func (c *ContextManager) ClearGlobalContext()

ClearGlobalContext removes all global context data

func (*ContextManager) ClearTaskContext

func (c *ContextManager) ClearTaskContext(taskID string)

ClearTaskContext removes all context data for a task

func (*ContextManager) ExportTaskContext

func (c *ContextManager) ExportTaskContext(taskID string) (string, error)

ExportTaskContext exports a task's context as JSON

func (*ContextManager) GetGlobalContext

func (c *ContextManager) GetGlobalContext(key string) (interface{}, bool)

GetGlobalContext gets a value from the global context

func (*ContextManager) GetTaskContext

func (c *ContextManager) GetTaskContext(taskID, key string) (interface{}, bool)

GetTaskContext gets a value from a task's context

func (*ContextManager) ImportTaskContext

func (c *ContextManager) ImportTaskContext(taskID string, jsonData string) error

ImportTaskContext imports JSON data into a task's context

func (*ContextManager) SetGlobalContext

func (c *ContextManager) SetGlobalContext(key string, value interface{})

SetGlobalContext sets a value in the global context

func (*ContextManager) SetTaskContext

func (c *ContextManager) SetTaskContext(taskID, key string, value interface{})

SetTaskContext sets a value in a task's context

func (*ContextManager) ShareTaskContext

func (c *ContextManager) ShareTaskContext(sourceTaskID, targetTaskID string, keys ...string) error

ShareTaskContext shares context data between tasks

type DynamicPriorityScorer

type DynamicPriorityScorer struct {
	// contains filtered or unexported fields
}

DynamicPriorityScorer adjusts priority based on queue metrics

func NewDynamicPriorityScorer

func NewDynamicPriorityScorer(baseScorer PriorityScorer) *DynamicPriorityScorer

NewDynamicPriorityScorer creates a new dynamic priority scorer

func (*DynamicPriorityScorer) CalculatePriority

func (d *DynamicPriorityScorer) CalculatePriority(task *Task, context TaskContext) float64

CalculatePriority implements PriorityScorer interface with dynamic adjustments

type ErrorCategory

type ErrorCategory string

ErrorCategory represents the broad category of an error

const (
	// TemporaryError represents transient errors that may resolve on retry
	TemporaryError ErrorCategory = "temporary"
	// ResourceError represents errors due to resource constraints
	ResourceError ErrorCategory = "resource"
	// TimeoutError represents deadline or timeout errors
	TimeoutError ErrorCategory = "timeout"
	// ValidationError represents errors in task input/output validation
	ValidationError ErrorCategory = "validation"
	// PermissionError represents authorization/access errors
	PermissionError ErrorCategory = "permission"
	// SystemError represents internal system errors
	SystemError ErrorCategory = "system"
	// UnknownError represents unclassified errors
	UnknownError ErrorCategory = "unknown"
)

type ErrorClassifier

type ErrorClassifier struct {
	// contains filtered or unexported fields
}

ErrorClassifier provides error classification functionality

func NewErrorClassifier

func NewErrorClassifier() *ErrorClassifier

NewErrorClassifier creates a new error classifier

func (*ErrorClassifier) Classify

func (ec *ErrorClassifier) Classify(err error) ErrorCategory

Classify determines the category of an error

func (*ErrorClassifier) RegisterClassifier

func (ec *ErrorClassifier) RegisterClassifier(fn func(error) (ErrorCategory, bool))

RegisterClassifier adds a custom error classifier

type ErrorWithMetadata

type ErrorWithMetadata struct {
	Err      error
	Category ErrorCategory
	Metadata map[string]interface{}
	Time     time.Time
}

ErrorWithMetadata adds context and metadata to an error

func NewErrorWithMetadata

func NewErrorWithMetadata(err error, category ErrorCategory, metadata map[string]interface{}) *ErrorWithMetadata

NewErrorWithMetadata creates a new error with metadata

func (*ErrorWithMetadata) Error

func (e *ErrorWithMetadata) Error() string

Error implements the error interface

func (*ErrorWithMetadata) Unwrap

func (e *ErrorWithMetadata) Unwrap() error

Unwrap returns the underlying error

type ExecutionMetrics

type ExecutionMetrics struct {
	TaskID        string
	StartTime     time.Time
	EndTime       time.Time
	Duration      time.Duration
	CPUUsage      float64
	MemoryUsage   float64
	NetworkUsage  float64
	ErrorCount    uint32
	RetryCount    uint32
	Status        string
	ResourceUsage map[string]float64
}

ExecutionMetrics tracks task execution performance metrics

func NewExecutionMetrics

func NewExecutionMetrics(taskID string) *ExecutionMetrics

NewExecutionMetrics creates a new metrics tracker for a task

func (*ExecutionMetrics) Complete

func (m *ExecutionMetrics) Complete()

Complete marks the task as complete and calculates final metrics

func (*ExecutionMetrics) GetDuration

func (m *ExecutionMetrics) GetDuration() time.Duration

GetDuration returns the task duration

func (*ExecutionMetrics) GetResourceUsage

func (m *ExecutionMetrics) GetResourceUsage() map[string]float64

GetResourceUsage gets the current resource usage

func (*ExecutionMetrics) IncrementErrors

func (m *ExecutionMetrics) IncrementErrors() uint32

IncrementErrors increments the error counter

func (*ExecutionMetrics) IncrementRetries

func (m *ExecutionMetrics) IncrementRetries() uint32

IncrementRetries increments the retry counter

func (*ExecutionMetrics) SetStatus

func (m *ExecutionMetrics) SetStatus(status string)

SetStatus updates the task status

func (*ExecutionMetrics) UpdateResourceUsage

func (m *ExecutionMetrics) UpdateResourceUsage(resource string, usage float64)

UpdateResourceUsage updates resource usage metrics

type MessageType

type MessageType int

MessageType represents different types of task messages

const (
	// MessageTypeResult represents a task result
	MessageTypeResult MessageType = iota
	// MessageTypeProgress represents task progress update
	MessageTypeProgress
	// MessageTypeLog represents a log message
	MessageTypeLog
	// MessageTypeError represents an error message
	MessageTypeError
	// MessageTypeStatus represents a status update
	MessageTypeStatus
)

type OutputStore

type OutputStore struct {
	// contains filtered or unexported fields
}

OutputStore manages task output persistence

func NewOutputStore

func NewOutputStore(baseDir string) (*OutputStore, error)

NewOutputStore creates a new output store

func (*OutputStore) DeleteOutput

func (s *OutputStore) DeleteOutput(taskID string) error

DeleteOutput removes a task output

func (*OutputStore) GetOutput

func (s *OutputStore) GetOutput(taskID string) (*TaskOutput, error)

GetOutput retrieves a task output

func (*OutputStore) ListOutputs

func (s *OutputStore) ListOutputs() []*TaskOutput

ListOutputs returns all stored task outputs

func (*OutputStore) QueryOutputs

func (s *OutputStore) QueryOutputs(criteria map[string]interface{}) []*TaskOutput

QueryOutputs searches for outputs matching criteria

func (*OutputStore) SaveOutput

func (s *OutputStore) SaveOutput(output *TaskOutput) error

SaveOutput saves a task output

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline manages the execution flow of tasks

func NewPipeline

func NewPipeline(outputDir string) (*Pipeline, error)

NewPipeline creates a new task execution pipeline

func (*Pipeline) AddCallback

func (p *Pipeline) AddCallback(callback PipelineCallback)

AddCallback adds a callback for task status changes

func (*Pipeline) AddTask

func (p *Pipeline) AddTask(task *Task) error

AddTask adds a task to the pipeline with validation

func (*Pipeline) CancelTask

func (p *Pipeline) CancelTask(taskID string) error

CancelTask cancels a running task

func (*Pipeline) Close

func (p *Pipeline) Close() error

Close cleans up pipeline resources

func (*Pipeline) DeleteTaskOutput

func (p *Pipeline) DeleteTaskOutput(taskID string) error

DeleteTaskOutput removes a task's output from storage

func (*Pipeline) Execute

func (p *Pipeline) Execute(ctx context.Context) error

Execute runs all tasks in the pipeline with optimization

func (*Pipeline) ExecuteAsync

func (p *Pipeline) ExecuteAsync(ctx context.Context) (<-chan AsyncResult, error)

ExecuteAsync runs all tasks in the pipeline asynchronously

func (*Pipeline) ExecuteTask

func (p *Pipeline) ExecuteTask(ctx context.Context, task *Task) error

ExecuteTask executes a single task with validation and metrics

func (*Pipeline) ExecuteTaskAsync

func (p *Pipeline) ExecuteTaskAsync(ctx context.Context, task *Task) (<-chan AsyncResult, error)

ExecuteTaskAsync executes a single task asynchronously

func (*Pipeline) GetContextManager

func (p *Pipeline) GetContextManager() *ContextManager

GetContextManager returns the pipeline's context manager

func (*Pipeline) GetGlobalContext

func (p *Pipeline) GetGlobalContext(key string) (interface{}, bool)

GetGlobalContext gets a value from the global context

func (*Pipeline) GetTaskContext

func (p *Pipeline) GetTaskContext(taskID, key string) (interface{}, bool)

GetTaskContext gets a value from a task's context

func (*Pipeline) GetTaskOutput

func (p *Pipeline) GetTaskOutput(taskID string) (*TaskOutput, error)

GetTaskOutput retrieves a task's output from storage

func (*Pipeline) GetTaskResult

func (p *Pipeline) GetTaskResult(taskID string) (string, bool)

GetTaskResult returns the result of a completed task

func (*Pipeline) GetTaskStatus

func (p *Pipeline) GetTaskStatus(taskID string) TaskStatus

GetTaskStatus returns the current status of a task

func (*Pipeline) ListTaskOutputs

func (p *Pipeline) ListTaskOutputs() []*TaskOutput

ListTaskOutputs returns all stored task outputs

func (*Pipeline) QueryTaskOutputs

func (p *Pipeline) QueryTaskOutputs(criteria map[string]interface{}) []*TaskOutput

QueryTaskOutputs searches for outputs matching the given criteria

func (*Pipeline) SetGlobalContext

func (p *Pipeline) SetGlobalContext(key string, value interface{})

SetGlobalContext sets a value in the global context

func (*Pipeline) SetTaskContext

func (p *Pipeline) SetTaskContext(taskID, key string, value interface{})

SetTaskContext sets a value in a task's context

func (*Pipeline) ShareContext

func (p *Pipeline) ShareContext(sourceTaskID, targetTaskID string, keys ...string) error

ShareContext shares context between tasks

type PipelineCallback

type PipelineCallback func(task *Task, status TaskStatus, result string)

PipelineCallback is called when task status changes

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue implements a thread-safe priority queue for tasks

func NewPriorityQueue

func NewPriorityQueue(maxCapacity int) *PriorityQueue

NewPriorityQueue creates a new priority queue instance

func (*PriorityQueue) Clear

func (pq *PriorityQueue) Clear()

Clear removes all items from the queue

func (*PriorityQueue) Dequeue

func (pq *PriorityQueue) Dequeue() (*Task, bool)

Dequeue removes and returns the highest priority task

func (*PriorityQueue) DequeueBatch

func (pq *PriorityQueue) DequeueBatch(count int) []*Task

DequeueBatch removes and returns multiple high priority tasks

func (*PriorityQueue) Enqueue

func (pq *PriorityQueue) Enqueue(task *Task) error

Enqueue adds a task to the queue

func (*PriorityQueue) EnqueueBatch

func (pq *PriorityQueue) EnqueueBatch(tasks []*Task) error

EnqueueBatch adds multiple tasks to the queue

func (*PriorityQueue) GetMetrics

func (pq *PriorityQueue) GetMetrics() map[string]interface{}

GetMetrics returns the queue metrics

func (*PriorityQueue) Len

func (pq *PriorityQueue) Len() int

Len returns the number of items in the queue

func (*PriorityQueue) Less

func (pq *PriorityQueue) Less(i, j int) bool

Less compares two items for priority ordering

func (*PriorityQueue) Peek

func (pq *PriorityQueue) Peek() (*Task, bool)

Peek returns the highest priority task without removing it

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

Pop removes and returns the highest priority item

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

Push adds an item to the queue

func (*PriorityQueue) Remove

func (pq *PriorityQueue) Remove(taskID string) bool

Remove removes a specific task from the queue

func (*PriorityQueue) SetScorer

func (pq *PriorityQueue) SetScorer(scorer PriorityScorer)

SetScorer sets a custom priority scorer

func (*PriorityQueue) Swap

func (pq *PriorityQueue) Swap(i, j int)

Swap swaps two items in the queue

func (*PriorityQueue) UpdatePriority

func (pq *PriorityQueue) UpdatePriority(taskID string) bool

UpdatePriority updates the priority of a task

type PriorityScorer

type PriorityScorer interface {
	CalculatePriority(task *Task, context TaskContext) float64
}

PriorityScorer defines how task priority is calculated

type QueueItem

type QueueItem struct {
	Task       *Task
	Priority   float64
	Deadline   time.Time
	Index      int // Used by heap.Interface
	InsertedAt time.Time
}

QueueItem represents an item in the priority queue

type QueueMetrics

type QueueMetrics struct {
	// contains filtered or unexported fields
}

QueueMetrics tracks performance and statistics of the priority queue

func NewQueueMetrics

func NewQueueMetrics() *QueueMetrics

NewQueueMetrics creates a new queue metrics instance

func (*QueueMetrics) GetAverageWaitTime

func (m *QueueMetrics) GetAverageWaitTime() time.Duration

GetAverageWaitTime returns the average wait time for tasks

func (*QueueMetrics) GetMetricsSnapshot

func (m *QueueMetrics) GetMetricsSnapshot() map[string]interface{}

GetMetricsSnapshot returns a snapshot of current metrics

func (*QueueMetrics) GetQueueLoad

func (m *QueueMetrics) GetQueueLoad() float64

GetQueueLoad returns the current queue load (0.0 to 1.0)

func (*QueueMetrics) Reset

func (m *QueueMetrics) Reset()

Reset resets all metrics to initial values

func (*QueueMetrics) TaskDequeued

func (m *QueueMetrics) TaskDequeued(waitTime time.Duration)

TaskDequeued records metrics when a task is removed from the queue

func (*QueueMetrics) TaskEnqueued

func (m *QueueMetrics) TaskEnqueued(priority int)

TaskEnqueued records metrics when a task is added to the queue

type RecoveryAttempt

type RecoveryAttempt struct {
	TaskID           string
	Strategy         RecoveryStrategy
	ErrorCategory    ErrorCategory
	StartTime        time.Time
	EndTime          time.Time
	Duration         time.Duration
	Successful       bool
	ErrorDetails     string
	AttemptNumber    int
	BackoffDuration  time.Duration
	MetadataSnapshot map[string]interface{}
}

RecoveryAttempt represents a single recovery attempt

type RecoveryManager

type RecoveryManager struct {
	// contains filtered or unexported fields
}

RecoveryManager handles task recovery and error handling

func NewRecoveryManager

func NewRecoveryManager() *RecoveryManager

NewRecoveryManager creates a new recovery manager instance

func (*RecoveryManager) ClearRecoveryState

func (r *RecoveryManager) ClearRecoveryState(taskID string)

ClearRecoveryState removes recovery state for a task

func (*RecoveryManager) GetDefaultOptions

func (r *RecoveryManager) GetDefaultOptions() *RecoveryOptions

GetDefaultOptions returns the current default recovery options

func (*RecoveryManager) GetMetrics

func (r *RecoveryManager) GetMetrics() *RecoveryMetrics

GetMetrics returns the recovery metrics

func (*RecoveryManager) GetRecoveryState

func (r *RecoveryManager) GetRecoveryState(taskID string) (*RecoveryState, bool)

GetRecoveryState returns the current recovery state for a task

func (*RecoveryManager) HandleError

func (r *RecoveryManager) HandleError(ctx context.Context, task *Task, err error, opts *RecoveryOptions) (RecoveryStrategy, error)

HandleError determines and executes the appropriate recovery strategy

func (*RecoveryManager) RegisterErrorClassifier

func (r *RecoveryManager) RegisterErrorClassifier(fn func(error) (ErrorCategory, bool))

RegisterErrorClassifier adds a custom error classifier

func (*RecoveryManager) SetDefaultOptions

func (r *RecoveryManager) SetDefaultOptions(opts *RecoveryOptions)

SetDefaultOptions updates the default recovery options

type RecoveryMetrics

type RecoveryMetrics struct {

	// Total attempts per strategy
	AttemptsByStrategy map[RecoveryStrategy]int
	// Successful recoveries per strategy
	SuccessesByStrategy map[RecoveryStrategy]int
	// Failed recoveries per strategy
	FailuresByStrategy map[RecoveryStrategy]int

	// Attempts by error category
	AttemptsByCategory map[ErrorCategory]int
	// Successes by error category
	SuccessesByCategory map[ErrorCategory]int
	// Failures by error category
	FailuresByCategory map[ErrorCategory]int

	// Average recovery time per strategy
	AvgRecoveryTime map[RecoveryStrategy]time.Duration
	// Total recovery time per strategy
	TotalRecoveryTime map[RecoveryStrategy]time.Duration
	// Number of timing samples per strategy
	TimingSamples map[RecoveryStrategy]int

	// Historical recovery attempts
	History []*RecoveryAttempt
	// contains filtered or unexported fields
}

RecoveryMetrics tracks statistics about error recovery attempts

func NewRecoveryMetrics

func NewRecoveryMetrics() *RecoveryMetrics

NewRecoveryMetrics creates a new recovery metrics tracker

func (*RecoveryMetrics) GetAverageRecoveryTime

func (rm *RecoveryMetrics) GetAverageRecoveryTime(strategy RecoveryStrategy) time.Duration

GetAverageRecoveryTime returns the average recovery time for a strategy

func (*RecoveryMetrics) GetCategorySuccessRate

func (rm *RecoveryMetrics) GetCategorySuccessRate(category ErrorCategory) float64

GetCategorySuccessRate returns the success rate for an error category

func (*RecoveryMetrics) GetMetricsSummary

func (rm *RecoveryMetrics) GetMetricsSummary() map[string]interface{}

GetMetricsSummary returns a summary of all metrics

func (*RecoveryMetrics) GetSuccessRate

func (rm *RecoveryMetrics) GetSuccessRate(strategy RecoveryStrategy) float64

GetSuccessRate returns the success rate for a strategy

func (*RecoveryMetrics) PruneHistory

func (rm *RecoveryMetrics) PruneHistory(maxAge time.Duration)

PruneHistory removes old history entries beyond a certain age

func (*RecoveryMetrics) RecordAttempt

func (rm *RecoveryMetrics) RecordAttempt(attempt *RecoveryAttempt)

RecordAttempt records a recovery attempt

type RecoveryOptions

type RecoveryOptions struct {
	Strategy         RecoveryStrategy
	MaxRetries       int
	InitialBackoff   time.Duration
	MaxBackoff       time.Duration
	TimeoutExtension time.Duration
	FallbackFunc     func(context.Context) (string, error)
}

RecoveryOptions configures how recovery should be handled

type RecoveryState

type RecoveryState struct {
	TaskID       string
	Strategy     RecoveryStrategy
	Attempts     int
	LastError    error
	LastAttempt  time.Time
	NextAttempt  time.Time
	BackoffDelay time.Duration
	FallbackUsed bool
}

RecoveryState tracks the state of a recovery attempt

type RecoveryStrategy

type RecoveryStrategy int

RecoveryStrategy defines how to handle task failures

const (
	// RetryWithBackoff retries the task with exponential backoff
	RetryWithBackoff RecoveryStrategy = iota
	// RetryWithTimeout retries the task with a new timeout
	RetryWithTimeout
	// RetryWithFallback retries the task with fallback options
	RetryWithFallback
	// AbortExecution stops execution and reports failure
	AbortExecution
)

type RetryState

type RetryState struct {
	Attempts     int
	MaxAttempts  int
	LastAttempt  time.Time
	NextAttempt  time.Time
	BackoffDelay time.Duration
}

RetryState tracks retry-related information for a task

type Task

type Task struct {
	// Core fields
	ID             string
	Description    string
	ExpectedOutput string
	Status         TaskStatus
	Result         string
	Error          error
	ExecuteFunc    func(context.Context) (string, error)
	Context        string
	Timeout        time.Duration

	// Assignment and delegation
	AssignedAgent  string
	ParentID       string
	AsyncExecution bool

	// Timing
	CreatedAt   time.Time
	StartedAt   *time.Time
	CompletedAt *time.Time

	// Progress tracking
	Progress float64
	Messages []string

	// Additional data
	Metadata map[string]interface{}

	// Priority handling fields
	Priority      TaskPriority
	Deadline      time.Time
	Dependencies  []string // IDs of tasks that must complete before this one
	PriorityScore float64

	// Task requirements
	Requirements *capabilities.TaskRequirements
}

Task represents a unit of work to be executed by an agent

func NewTask

func NewTask(description string, expectedOutput string) (*Task, error)

NewTask creates a new task instance

func (*Task) AddMessage

func (t *Task) AddMessage(msg string)

AddMessage adds a progress message

func (*Task) Cancel

func (t *Task) Cancel() error

Cancel marks the task as cancelled

func (*Task) Complete

func (t *Task) Complete(result string) error

Complete marks the task as complete

func (*Task) Duration

func (t *Task) Duration() time.Duration

Duration returns the task duration

func (*Task) Fail

func (t *Task) Fail(err error) error

Fail marks the task as failed

func (*Task) GetError

func (t *Task) GetError() error

GetError returns task error if failed

func (*Task) GetResult

func (t *Task) GetResult() (string, error)

GetResult returns task result if complete

func (*Task) IsAsync

func (t *Task) IsAsync() bool

IsAsync checks if task can run asynchronously

func (*Task) IsCancelled

func (t *Task) IsCancelled() bool

IsCancelled checks if task was cancelled

func (*Task) IsComplete

func (t *Task) IsComplete() bool

IsComplete checks if task is complete

func (*Task) IsFailed

func (t *Task) IsFailed() bool

IsFailed checks if task failed

func (*Task) IsFinished

func (t *Task) IsFinished() bool

IsFinished checks if task is in a terminal state

func (*Task) Start

func (t *Task) Start(agentID string) error

Start marks the task as started

func (*Task) UpdateProgress

func (t *Task) UpdateProgress(progress float64) error

UpdateProgress updates task progress

func (*Task) WithAsyncExecution

func (t *Task) WithAsyncExecution(async bool) *Task

WithAsyncExecution sets async execution flag

func (*Task) WithDeadline

func (t *Task) WithDeadline(deadline time.Time) *Task

WithDeadline sets the task deadline

func (*Task) WithDependencies

func (t *Task) WithDependencies(deps []string) *Task

WithDependencies sets task dependencies

func (*Task) WithMetadata

func (t *Task) WithMetadata(metadata map[string]interface{}) *Task

WithMetadata adds metadata to the task

func (*Task) WithPriority

func (t *Task) WithPriority(priority TaskPriority) *Task

WithPriority sets the task priority

func (*Task) WithRequirements

func (t *Task) WithRequirements(reqs *capabilities.TaskRequirements) *Task

WithRequirements sets task requirements

func (*Task) WithTimeout

func (t *Task) WithTimeout(timeout time.Duration) *Task

WithTimeout sets a timeout for task execution

type TaskChannels

type TaskChannels struct {
	// contains filtered or unexported fields
}

TaskChannels manages communication channels for tasks

func NewTaskChannels

func NewTaskChannels() *TaskChannels

NewTaskChannels creates a new task channels manager

func (*TaskChannels) CloseChannels

func (tc *TaskChannels) CloseChannels(taskID string)

CloseChannels closes all channels for a task

func (*TaskChannels) CreateChannels

func (tc *TaskChannels) CreateChannels(taskID string)

CreateChannels creates all necessary channels for a task

func (*TaskChannels) GetLogChannel

func (tc *TaskChannels) GetLogChannel(taskID string) (<-chan string, error)

GetLogChannel returns the log channel for a task

func (*TaskChannels) GetProgressChannel

func (tc *TaskChannels) GetProgressChannel(taskID string) (<-chan float64, error)

GetProgressChannel returns the progress channel for a task

func (*TaskChannels) GetResultChannel

func (tc *TaskChannels) GetResultChannel(taskID string) (<-chan AsyncResult, error)

GetResultChannel returns the result channel for a task

func (*TaskChannels) GetStatusChannel

func (tc *TaskChannels) GetStatusChannel(taskID string) (<-chan TaskStatus, error)

GetStatusChannel returns the status channel for a task

func (*TaskChannels) SendLog

func (tc *TaskChannels) SendLog(taskID string, message string) error

SendLog sends a log message

func (*TaskChannels) SendProgress

func (tc *TaskChannels) SendProgress(taskID string, progress float64) error

SendProgress sends a progress update

func (*TaskChannels) SendResult

func (tc *TaskChannels) SendResult(taskID string, result AsyncResult) error

SendResult sends a task result

func (*TaskChannels) SendStatus

func (tc *TaskChannels) SendStatus(taskID string, status TaskStatus) error

SendStatus sends a status update

func (*TaskChannels) Subscribe

func (tc *TaskChannels) Subscribe(taskID string) (<-chan TaskMessage, error)

Subscribe creates a new subscription for task messages

func (*TaskChannels) Unsubscribe

func (tc *TaskChannels) Unsubscribe(taskID string, ch <-chan TaskMessage)

Unsubscribe removes a subscription

type TaskContext

type TaskContext struct {
	WaitingTime  time.Duration
	Dependencies []*Task
	QueueMetrics *QueueMetrics
	CreatedAt    time.Time
}

TaskContext provides context for priority calculation

type TaskLifecycle

type TaskLifecycle struct {
	// contains filtered or unexported fields
}

TaskLifecycle manages task lifecycle events and states

func NewTaskLifecycle

func NewTaskLifecycle() *TaskLifecycle

NewTaskLifecycle creates a new task lifecycle manager

func (*TaskLifecycle) CancelTask

func (l *TaskLifecycle) CancelTask(taskID string, reason string) error

CancelTask cancels a task and its context

func (*TaskLifecycle) Close

func (l *TaskLifecycle) Close()

Close cleans up all resources

func (*TaskLifecycle) ExtendTimeout

func (l *TaskLifecycle) ExtendTimeout(taskID string, extension time.Duration) error

ExtendTimeout extends the timeout for a task

func (*TaskLifecycle) GetContext

func (l *TaskLifecycle) GetContext(taskID string) (context.Context, error)

GetContext returns the context for a task

func (*TaskLifecycle) GetRetryState

func (l *TaskLifecycle) GetRetryState(taskID string) (*RetryState, error)

GetRetryState returns the retry state for a task

func (*TaskLifecycle) GetTimeRemaining

func (l *TaskLifecycle) GetTimeRemaining(taskID string) (time.Duration, error)

GetTimeRemaining returns the remaining time for a task

func (*TaskLifecycle) IsExpired

func (l *TaskLifecycle) IsExpired(taskID string) bool

IsExpired checks if a task has expired (exceeded its timeout)

func (*TaskLifecycle) RegisterTask

func (l *TaskLifecycle) RegisterTask(task *Task, timeout time.Duration, maxRetries int) error

RegisterTask registers a task for lifecycle management

func (*TaskLifecycle) ShouldRetry

func (l *TaskLifecycle) ShouldRetry(taskID string, err error) (bool, time.Duration)

ShouldRetry determines if a task should be retried

func (*TaskLifecycle) UnregisterTask

func (l *TaskLifecycle) UnregisterTask(taskID string)

UnregisterTask removes a task from lifecycle management

type TaskMessage

type TaskMessage struct {
	Type      MessageType
	TaskID    string
	Content   interface{}
	Timestamp time.Time
}

TaskMessage represents a message in task communication

type TaskNode

type TaskNode struct {
	Task          *Task
	Duration      time.Duration
	Resources     map[string]float64
	Dependencies  []*TaskNode
	Dependents    []*TaskNode
	EarliestStart time.Time
	LatestStart   time.Time
	Critical      bool
}

TaskNode represents a node in the task dependency graph

type TaskOption

type TaskOption func(*Task)

TaskOption represents a function that modifies a task

func WithContext

func WithContext(context string) TaskOption

WithContext sets the context for task execution

func WithExpectedOutput

func WithExpectedOutput(output string) TaskOption

WithExpectedOutput sets the expected output for task validation

func WithTimeout

func WithTimeout(timeout time.Duration) TaskOption

WithTimeout sets a timeout for task execution

type TaskOutput

type TaskOutput struct {
	TaskID      string                 `json:"task_id"`
	Description string                 `json:"description"`
	Status      TaskStatus             `json:"status"`
	Result      string                 `json:"result"`
	Context     map[string]interface{} `json:"context"`
	Metadata    map[string]interface{} `json:"metadata"`
	StartedAt   *time.Time             `json:"started_at,omitempty"`
	CompletedAt *time.Time             `json:"completed_at,omitempty"`
	Error       string                 `json:"error,omitempty"`
	Duration    time.Duration          `json:"duration,omitempty"`
}

TaskOutput represents the complete output of a task execution

type TaskPlanner

type TaskPlanner struct {
	// contains filtered or unexported fields
}

TaskPlanner handles task scheduling and resource allocation

func NewTaskPlanner

func NewTaskPlanner(resourceLimits map[string]float64) *TaskPlanner

NewTaskPlanner creates a new task planner

func (*TaskPlanner) AddTask

func (p *TaskPlanner) AddTask(task *Task, dependencies []string, resources map[string]float64, duration time.Duration) error

AddTask adds a task to the planner with its dependencies and resource requirements

func (*TaskPlanner) DetectCycles

func (p *TaskPlanner) DetectCycles() [][]string

DetectCycles checks for dependency cycles in the task graph

func (*TaskPlanner) EstimateCosts

func (p *TaskPlanner) EstimateCosts() map[string]float64

EstimateCosts estimates execution costs for tasks

func (*TaskPlanner) GetCriticalPath

func (p *TaskPlanner) GetCriticalPath() []*Task

GetCriticalPath identifies the critical path in the task graph

func (*TaskPlanner) OptimizeResources

func (p *TaskPlanner) OptimizeResources() error

OptimizeResources optimizes resource allocation across tasks

type TaskPriority

type TaskPriority int

TaskPriority represents the priority level of a task

const (
	PriorityLowest  TaskPriority = -2
	PriorityLow     TaskPriority = -1
	PriorityNormal  TaskPriority = 0
	PriorityHigh    TaskPriority = 1
	PriorityHighest TaskPriority = 2
)

type TaskProgress

type TaskProgress struct {
	// TaskID uniquely identifies the task
	TaskID string

	// AgentID identifies the agent executing the task
	AgentID string

	// Status indicates the current state
	Status TaskStatus

	// Progress is a number between 0 and 1
	Progress float64

	// Message provides additional context about the current state
	Message string

	// Error holds any error that occurred
	Error error

	// StartTime is when the task began
	StartTime time.Time

	// EndTime is when the task completed/failed
	EndTime *time.Time

	// Requirements specifies what the task needs
	Requirements *capabilities.TaskRequirements

	// Subtasks tracks progress of component tasks
	Subtasks []*TaskProgress

	// Metadata holds additional task-specific data
	Metadata map[string]interface{}
}

TaskProgress represents the progress of a task

type TaskSchema

type TaskSchema struct {
	RequiredFields    []string
	AllowedPriorities []TaskPriority
	MaxDescriptionLen int
	MaxTimeout        time.Duration
	ResourceLimits    map[string]float64
}

TaskSchema defines validation rules for tasks

type TaskStatus

type TaskStatus string

TaskStatus represents the current state of a task

const (
	TaskStatusPending   TaskStatus = "pending"
	TaskStatusRunning   TaskStatus = "running"
	TaskStatusComplete  TaskStatus = "complete"
	TaskStatusFailed    TaskStatus = "failed"
	TaskStatusCancelled TaskStatus = "cancelled"
	// TaskStatusBlocked indicates task is blocked by dependencies
	TaskStatusBlocked TaskStatus = "blocked"
	// TaskStatusTimeout indicates task exceeded its timeout
	TaskStatusTimeout TaskStatus = "timeout"
	// TaskStatusRetrying indicates task is being retried after failure
	TaskStatusRetrying TaskStatus = "retrying"
)

func GetStatus

func GetStatus(task *Task) TaskStatus

GetStatus returns the current status of the task

type TaskValidator

type TaskValidator struct {
	// contains filtered or unexported fields
}

TaskValidator handles task validation

func NewTaskValidator

func NewTaskValidator(schema *TaskSchema) *TaskValidator

NewTaskValidator creates a new task validator

func (*TaskValidator) ValidateIOContract

func (v *TaskValidator) ValidateIOContract(taskID string, context interface{}, result string) []error

ValidateIOContract checks if task input/output meets requirements

func (*TaskValidator) ValidatePerformance

func (v *TaskValidator) ValidatePerformance(metrics *ExecutionMetrics) []error

ValidatePerformance checks if task performance meets requirements

func (*TaskValidator) ValidateResourceUsage

func (v *TaskValidator) ValidateResourceUsage(resources map[string]float64) []error

ValidateResourceUsage checks if resource usage is within limits

func (*TaskValidator) ValidateSchema

func (v *TaskValidator) ValidateSchema(task *Task) []error

ValidateSchema checks if a task meets schema requirements

type WeightedPriorityScorer

type WeightedPriorityScorer struct {
	PriorityWeight   float64
	DeadlineWeight   float64
	WaitTimeWeight   float64
	DependencyWeight float64
}

WeightedPriorityScorer implements weighted priority calculation

func NewWeightedPriorityScorer

func NewWeightedPriorityScorer() *WeightedPriorityScorer

NewWeightedPriorityScorer creates a new weighted priority scorer with default weights

func (*WeightedPriorityScorer) CalculatePriority

func (w *WeightedPriorityScorer) CalculatePriority(task *Task, context TaskContext) float64

CalculatePriority implements PriorityScorer interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL