Documentation ¶
Index ¶
- func IsUnrecoverable(err error) bool
- func NotifyCoordinatorOfExisting(ctx context.Context, log *zap.Logger, ts TaskService, coord Coordinator) error
- func TaskNotifyCoordinatorOfExisting(ctx context.Context, ts TaskService, tcs TaskControlService, coord Coordinator, ...) error
- type AnalyticalStorage
- func (as *AnalyticalStorage) FindLogs(ctx context.Context, filter taskmodel.LogFilter) ([]*taskmodel.Log, int, error)
- func (as *AnalyticalStorage) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)
- func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter taskmodel.RunFilter) ([]*taskmodel.Run, int, error)
- func (as *AnalyticalStorage) FinishRun(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)
- func (as *AnalyticalStorage) RetryRun(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)
- type Coordinator
- type DataDestination
- type NoOpPointsWriter
- type PointsWriter
- type RunRecorder
- type SchedulableTaskService
- type StoragePointsWriterRecorder
- type TaskControlService
- type TaskResumer
- type TaskService
- type UpdateTaskService
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsUnrecoverable ¶
IsUnrecoverable takes in an error and determines if it is permanent (requiring user intervention to fix)
func NotifyCoordinatorOfExisting ¶
func NotifyCoordinatorOfExisting(ctx context.Context, log *zap.Logger, ts TaskService, coord Coordinator) error
NotifyCoordinatorOfExisting lists all tasks by the provided task service and for each task it calls the provided coordinators task created method
func TaskNotifyCoordinatorOfExisting ¶
func TaskNotifyCoordinatorOfExisting(ctx context.Context, ts TaskService, tcs TaskControlService, coord Coordinator, exec TaskResumer, log *zap.Logger) error
TaskNotifyCoordinatorOfExisting lists all tasks by the provided task service and for each task it calls the provided coordinators task created method TODO(docmerlin): this is temporary untill the executor queue is persistent
Types ¶
type AnalyticalStorage ¶
type AnalyticalStorage struct { taskmodel.TaskService TaskControlService // contains filtered or unexported fields }
func NewAnalyticalStorage ¶
func NewAnalyticalStorage(log *zap.Logger, ts taskmodel.TaskService, tcs TaskControlService, cli influxdb.Client, dest DataDestination) *AnalyticalStorage
NewAnalyticalStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware (deprecated)
func (*AnalyticalStorage) FindLogs ¶
func (as *AnalyticalStorage) FindLogs(ctx context.Context, filter taskmodel.LogFilter) ([]*taskmodel.Log, int, error)
FindLogs returns logs for a run. First attempt to use the TaskService, then append additional analytical's logs to the list
func (*AnalyticalStorage) FindRunByID ¶
func (as *AnalyticalStorage) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error)
FindRunByID returns a single run. First see if it is in the existing TaskService. If not pull it from analytical storage.
func (*AnalyticalStorage) FindRuns ¶
func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter taskmodel.RunFilter) ([]*taskmodel.Run, int, error)
FindRuns returns a list of runs that match a filter and the total count of returned runs. First attempt to use the TaskService, then append additional analytical's runs to the list
type Coordinator ¶
Coordinator is a type with a single method which is called when a task has been created
type DataDestination ¶
type PointsWriter ¶
type RunRecorder ¶
RunRecorder is a type which records runs into an influxdb backed storage mechanism
type SchedulableTaskService ¶
type SchedulableTaskService struct {
UpdateTaskService
}
SchedulableTaskService implements the SchedulableService interface
func NewSchedulableTaskService ¶
func NewSchedulableTaskService(ts UpdateTaskService) SchedulableTaskService
NewSchedulableTaskService initializes a new SchedulableTaskService given an UpdateTaskService
func (SchedulableTaskService) UpdateLastScheduled ¶
func (s SchedulableTaskService) UpdateLastScheduled(ctx context.Context, id scheduler.ID, t time.Time) error
UpdateLastScheduled uses the task service to store the latest time a task was scheduled to run
type StoragePointsWriterRecorder ¶
type StoragePointsWriterRecorder struct {
// contains filtered or unexported fields
}
StoragePointsWriterRecorder is an implementation of RunRecorder which writes runs via an implementation of storage PointsWriter
func NewStoragePointsWriterRecorder ¶
func NewStoragePointsWriterRecorder(log *zap.Logger, cli influxdb.Client, dest DataDestination) *StoragePointsWriterRecorder
NewStoragePointsWriterRecorder configures and returns a new *StoragePointsWriterRecorder
type TaskControlService ¶
type TaskControlService interface { // CreateRun creates a run with a scheduled for time. CreateRun(ctx context.Context, taskID platform.ID, scheduledFor time.Time, runAt time.Time) (*taskmodel.Run, error) CurrentlyRunning(ctx context.Context, taskID platform.ID) ([]*taskmodel.Run, error) ManualRuns(ctx context.Context, taskID platform.ID) ([]*taskmodel.Run, error) // StartManualRun pulls a manual run from the list and moves it to currently running. StartManualRun(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error) // FinishRun removes runID from the list of running tasks and if its `ScheduledFor` is later then last completed update it. FinishRun(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error) // UpdateRunState sets the run state at the respective time. UpdateRunState(ctx context.Context, taskID, runID platform.ID, when time.Time, state taskmodel.RunStatus) error // AddRunLog adds a log line to the run. AddRunLog(ctx context.Context, taskID, runID platform.ID, when time.Time, log string) error }
TaskControlService is a low-level controller interface, intended to be passed to task executors and schedulers, which allows creation, completion, and status updates of runs.