Documentation ¶
Overview ¶
job.go contains core job execution logic for the orchestrator It manages job lifecycle from enqueuing to completion Handles task execution, state management, and error handling
orchestrator.go defines the main orchestrator structure and its core operations It manages job execution, worker pools, and system state Acts as the central coordinator for the job processing system
task.go handles task execution logic within the orchestrator Provides task registration, execution, and retry mechanisms Manages individual task lifecycle within jobs
Index ¶
- type Orchestrator
- func (o *Orchestrator) Close() error
- func (o *Orchestrator) EnqueueJob(definitionID string, data map[string]interface{}) (string, error)
- func (o *Orchestrator) ExecuteJob(ctx context.Context, executionID string) error
- func (o *Orchestrator) GetJobExecutionState(executionID string) (*models.JobExecutionState, error)
- func (o *Orchestrator) GetSystemState() (*models.SystemState, error)
- func (o *Orchestrator) RegisterJobDefinition(jd *models.JobDefinition) error
- func (o *Orchestrator) RegisterTaskFunction(taskID string, fn TaskFunction)
- type TaskFunction
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Orchestrator ¶
type Orchestrator struct {
// contains filtered or unexported fields
}
Orchestrator manages the complete job execution system Controls worker pools, maintains job state, and coordinates task execution Provides thread-safe operation for concurrent job processing
func New ¶
func New(db storage.DB, maxConcurrent int) (*Orchestrator, error)
New creates and initializes a new Orchestrator instance Sets up the worker pool and recovers any interrupted jobs Starts the job queue processing loop
func (*Orchestrator) Close ¶
func (o *Orchestrator) Close() error
Close gracefully shuts down the orchestrator Stops queue processing and waits for completion Ensures clean shutdown of database connection
func (*Orchestrator) EnqueueJob ¶
func (o *Orchestrator) EnqueueJob(definitionID string, data map[string]interface{}) (string, error)
EnqueueJob adds a new job to the execution queue It creates a new job execution instance and stores it in the database Returns the execution ID for tracking the job
func (*Orchestrator) ExecuteJob ¶
func (o *Orchestrator) ExecuteJob(ctx context.Context, executionID string) error
ExecuteJob runs a job and all its tasks in sequence Manages the complete lifecycle of a job execution Handles state transitions, task execution, and error cases
func (*Orchestrator) GetJobExecutionState ¶
func (o *Orchestrator) GetJobExecutionState(executionID string) (*models.JobExecutionState, error)
GetJobExecutionState retrieves the current state of a job execution Combines job execution state with task states for status reporting Returns a complete snapshot of job and task status
func (*Orchestrator) GetSystemState ¶
func (o *Orchestrator) GetSystemState() (*models.SystemState, error)
GetSystemState retrieves the current state of the entire system Provides overview of active and queued jobs Used for monitoring and debugging
func (*Orchestrator) RegisterJobDefinition ¶
func (o *Orchestrator) RegisterJobDefinition(jd *models.JobDefinition) error
RegisterJobDefinition adds a new job definition to the system Stores the definition for future execution Enables jobs to be executed using this definition
func (*Orchestrator) RegisterTaskFunction ¶
func (o *Orchestrator) RegisterTaskFunction(taskID string, fn TaskFunction)
RegisterTaskFunction associates a function with a task ID Allows the orchestrator to look up and execute task implementations Must be called before a task can be executed