Documentation ¶
Index ¶
- func IsUnrecoverable(err error) bool
- func NotifyCoordinatorOfExisting(ctx context.Context, log *zap.Logger, ts TaskService, coord Coordinator) error
- func TaskNotifyCoordinatorOfExisting(ctx context.Context, ts TaskService, tcs TaskControlService, coord Coordinator, ...) error
- type AnalyticalStorage
- func (as *AnalyticalStorage) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error)
- func (as *AnalyticalStorage) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
- func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error)
- func (as *AnalyticalStorage) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
- func (as *AnalyticalStorage) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
- type Coordinator
- type RunRecorder
- type SchedulableTaskService
- type StoragePointsWriterRecorder
- type TaskControlService
- type TaskResumer
- type TaskService
- type UpdateTaskService
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsUnrecoverable ¶
IsUnrecoverable takes in an error and determines if it is permanent (requiring user intervention to fix)
func NotifyCoordinatorOfExisting ¶
func NotifyCoordinatorOfExisting(ctx context.Context, log *zap.Logger, ts TaskService, coord Coordinator) error
NotifyCoordinatorOfExisting lists all tasks by the provided task service and for each task it calls the provided coordinators task created method
func TaskNotifyCoordinatorOfExisting ¶
func TaskNotifyCoordinatorOfExisting(ctx context.Context, ts TaskService, tcs TaskControlService, coord Coordinator, exec TaskResumer, log *zap.Logger) error
TaskNotifyCoordinatorOfExisting lists all tasks by the provided task service and for each task it calls the provided coordinators task created method TODO(docmerlin): this is temporary untill the executor queue is persistent
Types ¶
type AnalyticalStorage ¶
type AnalyticalStorage struct { influxdb.TaskService influxdb.BucketService TaskControlService // contains filtered or unexported fields }
func NewAnalyticalRunStorage ¶
func NewAnalyticalRunStorage(log *zap.Logger, ts influxdb.TaskService, bs influxdb.BucketService, tcs TaskControlService, rr RunRecorder, qs query.QueryService) *AnalyticalStorage
NewAnalyticalRunStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware
func NewAnalyticalStorage ¶
func NewAnalyticalStorage(log *zap.Logger, ts influxdb.TaskService, bs influxdb.BucketService, tcs TaskControlService, pw storage.PointsWriter, qs query.QueryService) *AnalyticalStorage
NewAnalyticalStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware (deprecated)
func (*AnalyticalStorage) FindLogs ¶
func (as *AnalyticalStorage) FindLogs(ctx context.Context, filter influxdb.LogFilter) ([]*influxdb.Log, int, error)
FindLogs returns logs for a run. First attempt to use the TaskService, then append additional analytical's logs to the list
func (*AnalyticalStorage) FindRunByID ¶
func (as *AnalyticalStorage) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
FindRunByID returns a single run. First see if it is in the existing TaskService. If not pull it from analytical storage.
func (*AnalyticalStorage) FindRuns ¶
func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error)
FindRuns returns a list of runs that match a filter and the total count of returned runs. First attempt to use the TaskService, then append additional analytical's runs to the list
type Coordinator ¶
Coordinator is a type with a single method which is called when a task has been created
type RunRecorder ¶
type RunRecorder interface {
Record(ctx context.Context, orgID influxdb.ID, org string, bucketID influxdb.ID, bucket string, run *influxdb.Run) error
}
RunRecorder is a type which records runs into an influxdb backed storage mechanism
type SchedulableTaskService ¶
type SchedulableTaskService struct {
UpdateTaskService
}
SchedulableTaskService implements the SchedulableService interface
func NewSchedulableTaskService ¶
func NewSchedulableTaskService(ts UpdateTaskService) SchedulableTaskService
NewSchedulableTaskService initializes a new SchedulableTaskService given an UpdateTaskService
func (SchedulableTaskService) UpdateLastScheduled ¶
func (s SchedulableTaskService) UpdateLastScheduled(ctx context.Context, id scheduler.ID, t time.Time) error
UpdateLastScheduled uses the task service to store the latest time a task was scheduled to run
type StoragePointsWriterRecorder ¶
type StoragePointsWriterRecorder struct {
// contains filtered or unexported fields
}
StoragePointsWriterRecorder is an implementation of RunRecorder which writes runs via an implementation of storage PointsWriter
func NewStoragePointsWriterRecorder ¶
func NewStoragePointsWriterRecorder(log *zap.Logger, pw storage.PointsWriter) *StoragePointsWriterRecorder
NewStoragePointsWriterRecorder configures and returns a new *StoragePointsWriterRecorder
func (*StoragePointsWriterRecorder) Record ¶
func (s *StoragePointsWriterRecorder) Record(ctx context.Context, orgID influxdb.ID, org string, bucketID influxdb.ID, bucket string, run *influxdb.Run) error
Record formats the provided run as a models.Point and writes the resulting point to an underlying storage.PointsWriter
type TaskControlService ¶
type TaskControlService interface { // CreateRun creates a run with a scheduled for time. CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) // StartManualRun pulls a manual run from the list and moves it to currently running. StartManualRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) // FinishRun removes runID from the list of running tasks and if its `ScheduledFor` is later then last completed update it. FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) // UpdateRunState sets the run state at the respective time. UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error // AddRunLog adds a log line to the run. AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error }
TaskControlService is a low-level controller interface, intended to be passed to task executors and schedulers, which allows creation, completion, and status updates of runs.
type TaskResumer ¶
type TaskService ¶
type TaskService interface { FindTasks(context.Context, influxdb.TaskFilter) ([]*influxdb.Task, int, error) UpdateTask(context.Context, influxdb.ID, influxdb.TaskUpdate) (*influxdb.Task, error) }
TaskService is a type on which tasks can be listed
type UpdateTaskService ¶
type UpdateTaskService interface {
UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error)
}
UpdateTaskService provides an API to update the LatestScheduled time of a task