Documentation ¶
Index ¶
- Constants
- Variables
- func ErrFluxParseError(err error) *errors.Error
- func ErrInternalTaskServiceError(err error) *errors.Error
- func ErrQueryError(err error) *errors.Error
- func ErrResultIteratorError(err error) *errors.Error
- func ErrRunExecutionError(err error) *errors.Error
- func ErrTaskConcurrencyLimitReached(runsInFront int) *errors.Error
- func ErrTaskOptionParse(err error) *errors.Error
- func ErrTaskTimeParse(err error) *errors.Error
- func ErrUnexpectedTaskBucketErr(err error) *errors.Error
- type Log
- type LogFilter
- type RequestStillQueuedError
- type Run
- type RunFilter
- type RunStatus
- type Task
- type TaskCreate
- type TaskFilter
- type TaskService
- type TaskStatus
- type TaskUpdate
Constants ¶
const ( TaskDefaultPageSize = 100 TaskMaxPageSize = 500 TaskStatusActive = "active" TaskStatusInactive = "inactive" )
Variables ¶
var ( // ErrRunCanceled is returned from the RunResult when a Run is Canceled. It is used mostly internally. ErrRunCanceled = &errors.Error{ Code: errors.EInternal, Msg: "run canceled", } // ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not. ErrTaskNotClaimed = &errors.Error{ Code: errors.EConflict, Msg: "task not claimed", } // ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is. ErrTaskAlreadyClaimed = &errors.Error{ Code: errors.EConflict, Msg: "task already claimed", } // ErrNoRunsFound is returned when searching for a range of runs, but none are found. ErrNoRunsFound = &errors.Error{ Code: errors.ENotFound, Msg: "no matching runs found", } // ErrInvalidTaskID error object for bad id's ErrInvalidTaskID = &errors.Error{ Code: errors.EInvalid, Msg: "invalid id", } // ErrTaskNotFound indicates no task could be found for given parameters. ErrTaskNotFound = &errors.Error{ Code: errors.ENotFound, Msg: "task not found", } // ErrRunNotFound is returned when searching for a single run that doesn't exist. ErrRunNotFound = &errors.Error{ Code: errors.ENotFound, Msg: "run not found", } ErrRunKeyNotFound = &errors.Error{ Code: errors.ENotFound, Msg: "run key not found", } ErrPageSizeTooSmall = &errors.Error{ Msg: "cannot have negative page limit", Code: errors.EInvalid, } // ErrPageSizeTooLarge indicates the page size is too large. This error is only // used in the kv task service implementation. The name of this error may lead it // to be used in a place that is not useful. The TaskMaxPageSize is the only one // at 500, the rest at 100. This would likely benefit from a more specific name // since those limits aren't shared globally. ErrPageSizeTooLarge = &errors.Error{ Msg: fmt.Sprintf("cannot use page size larger then %d", TaskMaxPageSize), Code: errors.EInvalid, } ErrOrgNotFound = &errors.Error{ Msg: "organization not found", Code: errors.ENotFound, } ErrTaskRunAlreadyQueued = &errors.Error{ Msg: "run already queued", Code: errors.EConflict, } // ErrOutOfBoundsLimit is returned with FindRuns is called with an invalid filter limit. ErrOutOfBoundsLimit = &errors.Error{ Code: errors.EUnprocessableEntity, Msg: "run limit is out of bounds, must be between 1 and 500", } // ErrInvalidOwnerID is called when trying to create a task with out a valid ownerID ErrInvalidOwnerID = &errors.Error{ Code: errors.EInvalid, Msg: "cannot create task with invalid ownerID", } )
var (
// TaskSystemType is the type set in tasks' for all crud requests
TaskSystemType = "system"
)
Functions ¶
func ErrFluxParseError ¶
ErrFluxParseError is returned when an error is thrown by Flux.Parse in the task executor
func ErrQueryError ¶
ErrQueryError is returned when an error is thrown by Query service in the task executor
func ErrResultIteratorError ¶
ErrResultIteratorError is returned when an error is thrown by exhaustResultIterators in the executor
func ErrRunExecutionError ¶
func ErrTaskOptionParse ¶
func ErrTaskTimeParse ¶
ErrTaskTimeParse an error for time parsing errors
func ErrUnexpectedTaskBucketErr ¶
ErrUnexpectedTaskBucketErr a generic error we can use when we rail to retrieve a bucket
Types ¶
type Log ¶
type Log struct { RunID platform.ID `json:"runID,omitempty"` Time string `json:"time"` Message string `json:"message"` }
Log represents a link to a log resource
type LogFilter ¶
type LogFilter struct { // Task ID is required. Task platform.ID // The optional Run ID limits logs to a single run. Run *platform.ID }
LogFilter represents a set of filters that restrict the returned log results.
type RequestStillQueuedError ¶
type RequestStillQueuedError struct {
// Unix timestamps matching existing request's start and end.
Start, End int64
}
RequestStillQueuedError is returned when attempting to retry a run which has not yet completed.
func ParseRequestStillQueuedError ¶
func ParseRequestStillQueuedError(msg string) *RequestStillQueuedError
ParseRequestStillQueuedError attempts to parse a RequestStillQueuedError from msg. If msg is formatted correctly, the resultant error is returned; otherwise it returns nil.
func (RequestStillQueuedError) Error ¶
func (e RequestStillQueuedError) Error() string
type Run ¶
type Run struct { ID platform.ID `json:"id,omitempty"` TaskID platform.ID `json:"taskID"` Status string `json:"status"` ScheduledFor time.Time `json:"scheduledFor"` // ScheduledFor is the Now time used in the task's query RunAt time.Time `json:"runAt"` // RunAt is the time the task is scheduled to be run, which is ScheduledFor + Offset StartedAt time.Time `json:"startedAt,omitempty"` // StartedAt is the time the executor begins running the task FinishedAt time.Time `json:"finishedAt,omitempty"` // FinishedAt is the time the executor finishes running the task RequestedAt time.Time `json:"requestedAt,omitempty"` // RequestedAt is the time the coordinator told the scheduler to schedule the task Log []Log `json:"log,omitempty"` }
Run is a record createId when a run of a task is scheduled.
type RunFilter ¶
type RunFilter struct { // Task ID is required for listing runs. Task platform.ID After *platform.ID Limit int AfterTime string BeforeTime string }
RunFilter represents a set of filters that restrict the returned results
type Task ¶
type Task struct { ID platform.ID `json:"id"` Type string `json:"type,omitempty"` OrganizationID platform.ID `json:"orgID"` Organization string `json:"org"` OwnerID platform.ID `json:"ownerID"` Name string `json:"name"` Description string `json:"description,omitempty"` Status string `json:"status"` Flux string `json:"flux"` Every string `json:"every,omitempty"` Cron string `json:"cron,omitempty"` Offset time.Duration `json:"offset,omitempty"` LatestCompleted time.Time `json:"latestCompleted,omitempty"` LatestScheduled time.Time `json:"latestScheduled,omitempty"` LatestSuccess time.Time `json:"latestSuccess,omitempty"` LatestFailure time.Time `json:"latestFailure,omitempty"` LastRunStatus string `json:"lastRunStatus,omitempty"` LastRunError string `json:"lastRunError,omitempty"` CreatedAt time.Time `json:"createdAt,omitempty"` UpdatedAt time.Time `json:"updatedAt,omitempty"` Metadata map[string]interface{} `json:"metadata,omitempty"` }
Task is a task. 🎊
func (*Task) EffectiveCron ¶
EffectiveCron returns the effective cron string of the options. If the cron option was specified, it is returned. If the every option was specified, it is converted into a cron string using "@every". Otherwise, the empty string is returned. The value of the offset option is not considered.
type TaskCreate ¶
type TaskCreate struct { Type string `json:"type,omitempty"` Flux string `json:"flux"` Description string `json:"description,omitempty"` Status string `json:"status,omitempty"` OrganizationID platform.ID `json:"orgID,omitempty"` Organization string `json:"org,omitempty"` OwnerID platform.ID `json:"-"` Metadata map[string]interface{} `json:"-"` // not to be set through a web request but rather used by a http service using tasks backend. }
TaskCreate is the set of values to create a task.
func (TaskCreate) Validate ¶
func (t TaskCreate) Validate() error
type TaskFilter ¶
type TaskFilter struct { Type *string Name *string After *platform.ID OrganizationID *platform.ID Organization string User *platform.ID Limit int Status *string }
TaskFilter represents a set of filters that restrict the returned results
func (TaskFilter) QueryParams ¶
func (f TaskFilter) QueryParams() map[string][]string
QueryParams Converts TaskFilter fields to url query params.
type TaskService ¶
type TaskService interface { // FindTaskByID returns a single task FindTaskByID(ctx context.Context, id platform.ID) (*Task, error) // FindTasks returns a list of tasks that match a filter (limit 100) and the total count // of matching tasks. FindTasks(ctx context.Context, filter TaskFilter) ([]*Task, int, error) // CreateTask creates a new task. // The owner of the task is inferred from the authorizer associated with ctx. CreateTask(ctx context.Context, t TaskCreate) (*Task, error) // UpdateTask updates a single task with changeset. UpdateTask(ctx context.Context, id platform.ID, upd TaskUpdate) (*Task, error) // DeleteTask removes a task by ID and purges all associated data and scheduled runs. DeleteTask(ctx context.Context, id platform.ID) error // FindLogs returns logs for a run. FindLogs(ctx context.Context, filter LogFilter) ([]*Log, int, error) // FindRuns returns a list of runs that match a filter and the total count of returned runs. FindRuns(ctx context.Context, filter RunFilter) ([]*Run, int, error) // FindRunByID returns a single run. FindRunByID(ctx context.Context, taskID, runID platform.ID) (*Run, error) // CancelRun cancels a currently running run. CancelRun(ctx context.Context, taskID, runID platform.ID) error // RetryRun creates and returns a new run (which is a retry of another run). RetryRun(ctx context.Context, taskID, runID platform.ID) (*Run, error) // ForceRun forces a run to occur with unix timestamp scheduledFor, to be executed as soon as possible. // The value of scheduledFor may or may not align with the task's schedule. ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*Run, error) }
TaskService represents a service for managing one-off and recurring tasks.
type TaskStatus ¶
type TaskStatus string
const ( TaskActive TaskStatus = "active" TaskInactive TaskStatus = "inactive" DefaultTaskStatus TaskStatus = TaskActive )
type TaskUpdate ¶
type TaskUpdate struct { Flux *string `json:"flux,omitempty"` Status *string `json:"status,omitempty"` Description *string `json:"description,omitempty"` // LatestCompleted us to set latest completed on startup to skip task catchup LatestCompleted *time.Time `json:"-"` LatestScheduled *time.Time `json:"-"` LatestSuccess *time.Time `json:"-"` LatestFailure *time.Time `json:"-"` LastRunStatus *string `json:"-"` LastRunError *string `json:"-"` Metadata map[string]interface{} `json:"-"` // not to be set through a web request but rather used by a http service using tasks backend. // Options gets unmarshalled from json as if it was flat, with the same level as Flux and Status. Options options.Options // when we unmarshal this gets unmarshalled from flat key-values }
TaskUpdate represents updates to a task. Options updates override any options set in the Flux field.
func (*TaskUpdate) MarshalJSON ¶
func (t *TaskUpdate) MarshalJSON() ([]byte, error)
func (*TaskUpdate) UnmarshalJSON ¶
func (t *TaskUpdate) UnmarshalJSON(data []byte) error
func (*TaskUpdate) UpdateFlux ¶
func (t *TaskUpdate) UpdateFlux(parser fluxlang.FluxLanguageService, oldFlux string) error
UpdateFlux updates the TaskUpdate to go from updating options to updating a flux string, that now has those updated options in it. It zeros the options in the TaskUpdate.
func (*TaskUpdate) Validate ¶
func (t *TaskUpdate) Validate() error