taskmodel

package
v1.6.1-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2021 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TaskDefaultPageSize = 100
	TaskMaxPageSize     = 500

	TaskStatusActive   = "active"
	TaskStatusInactive = "inactive"
)

Variables

View Source
var (
	// ErrRunCanceled is returned from the RunResult when a Run is Canceled.  It is used mostly internally.
	ErrRunCanceled = &errors.Error{
		Code: errors.EInternal,
		Msg:  "run canceled",
	}

	// ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not.
	ErrTaskNotClaimed = &errors.Error{
		Code: errors.EConflict,
		Msg:  "task not claimed",
	}

	// ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is.
	ErrTaskAlreadyClaimed = &errors.Error{
		Code: errors.EConflict,
		Msg:  "task already claimed",
	}

	// ErrNoRunsFound is returned when searching for a range of runs, but none are found.
	ErrNoRunsFound = &errors.Error{
		Code: errors.ENotFound,
		Msg:  "no matching runs found",
	}

	// ErrInvalidTaskID error object for bad id's
	ErrInvalidTaskID = &errors.Error{
		Code: errors.EInvalid,
		Msg:  "invalid id",
	}

	// ErrTaskNotFound indicates no task could be found for given parameters.
	ErrTaskNotFound = &errors.Error{
		Code: errors.ENotFound,
		Msg:  "task not found",
	}

	// ErrRunNotFound is returned when searching for a single run that doesn't exist.
	ErrRunNotFound = &errors.Error{
		Code: errors.ENotFound,
		Msg:  "run not found",
	}

	ErrRunKeyNotFound = &errors.Error{
		Code: errors.ENotFound,
		Msg:  "run key not found",
	}

	ErrPageSizeTooSmall = &errors.Error{
		Msg:  "cannot have negative page limit",
		Code: errors.EInvalid,
	}

	// ErrPageSizeTooLarge indicates the page size is too large. This error is only
	// used in the kv task service implementation. The name of this error may lead it
	// to be used in a place that is not useful. The TaskMaxPageSize is the only one
	// at 500, the rest at 100. This would likely benefit from a more specific name
	// since those limits aren't shared globally.
	ErrPageSizeTooLarge = &errors.Error{
		Msg:  fmt.Sprintf("cannot use page size larger then %d", TaskMaxPageSize),
		Code: errors.EInvalid,
	}

	ErrOrgNotFound = &errors.Error{
		Msg:  "organization not found",
		Code: errors.ENotFound,
	}

	ErrTaskRunAlreadyQueued = &errors.Error{
		Msg:  "run already queued",
		Code: errors.EConflict,
	}

	// ErrOutOfBoundsLimit is returned with FindRuns is called with an invalid filter limit.
	ErrOutOfBoundsLimit = &errors.Error{
		Code: errors.EUnprocessableEntity,
		Msg:  "run limit is out of bounds, must be between 1 and 500",
	}

	// ErrInvalidOwnerID is called when trying to create a task with out a valid ownerID
	ErrInvalidOwnerID = &errors.Error{
		Code: errors.EInvalid,
		Msg:  "cannot create task with invalid ownerID",
	}
)
View Source
var (
	// TaskSystemType is the type set in tasks' for all crud requests
	TaskSystemType = "system"
)

Functions

func ErrFluxParseError

func ErrFluxParseError(err error) *errors.Error

ErrFluxParseError is returned when an error is thrown by Flux.Parse in the task executor

func ErrInternalTaskServiceError

func ErrInternalTaskServiceError(err error) *errors.Error

func ErrQueryError

func ErrQueryError(err error) *errors.Error

ErrQueryError is returned when an error is thrown by Query service in the task executor

func ErrResultIteratorError

func ErrResultIteratorError(err error) *errors.Error

ErrResultIteratorError is returned when an error is thrown by exhaustResultIterators in the executor

func ErrRunExecutionError

func ErrRunExecutionError(err error) *errors.Error

func ErrTaskConcurrencyLimitReached

func ErrTaskConcurrencyLimitReached(runsInFront int) *errors.Error

func ErrTaskOptionParse

func ErrTaskOptionParse(err error) *errors.Error

func ErrTaskTimeParse

func ErrTaskTimeParse(err error) *errors.Error

ErrTaskTimeParse an error for time parsing errors

func ErrUnexpectedTaskBucketErr

func ErrUnexpectedTaskBucketErr(err error) *errors.Error

ErrUnexpectedTaskBucketErr a generic error we can use when we rail to retrieve a bucket

Types

type Log

type Log struct {
	RunID   platform.ID `json:"runID,omitempty"`
	Time    string      `json:"time"`
	Message string      `json:"message"`
}

Log represents a link to a log resource

func (Log) String

func (l Log) String() string

type LogFilter

type LogFilter struct {
	// Task ID is required.
	Task platform.ID

	// The optional Run ID limits logs to a single run.
	Run *platform.ID
}

LogFilter represents a set of filters that restrict the returned log results.

type QueryService

type QueryService interface {
	Query(ctx context.Context, compiler flux.Compiler) (flux.ResultIterator, error)
}

type RequestStillQueuedError

type RequestStillQueuedError struct {
	// Unix timestamps matching existing request's start and end.
	Start, End int64
}

RequestStillQueuedError is returned when attempting to retry a run which has not yet completed.

func ParseRequestStillQueuedError

func ParseRequestStillQueuedError(msg string) *RequestStillQueuedError

ParseRequestStillQueuedError attempts to parse a RequestStillQueuedError from msg. If msg is formatted correctly, the resultant error is returned; otherwise it returns nil.

func (RequestStillQueuedError) Error

func (e RequestStillQueuedError) Error() string

type Run

type Run struct {
	ID           platform.ID `json:"id,omitempty"`
	TaskID       platform.ID `json:"taskID"`
	Status       string      `json:"status"`
	ScheduledFor time.Time   `json:"scheduledFor"`          // ScheduledFor is the Now time used in the task's query
	RunAt        time.Time   `json:"runAt"`                 // RunAt is the time the task is scheduled to be run, which is ScheduledFor + Offset
	StartedAt    time.Time   `json:"startedAt,omitempty"`   // StartedAt is the time the executor begins running the task
	FinishedAt   time.Time   `json:"finishedAt,omitempty"`  // FinishedAt is the time the executor finishes running the task
	RequestedAt  time.Time   `json:"requestedAt,omitempty"` // RequestedAt is the time the coordinator told the scheduler to schedule the task
	Log          []Log       `json:"log,omitempty"`
}

Run is a record createId when a run of a task is scheduled.

type RunFilter

type RunFilter struct {
	// Task ID is required for listing runs.
	Task platform.ID

	After      *platform.ID
	Limit      int
	AfterTime  string
	BeforeTime string
}

RunFilter represents a set of filters that restrict the returned results

type RunStatus

type RunStatus int
const (
	RunStarted RunStatus = iota
	RunSuccess
	RunFail
	RunCanceled
	RunScheduled
)

func (RunStatus) String

func (r RunStatus) String() string

type Task

type Task struct {
	ID                   platform.ID            `json:"id"`
	Type                 string                 `json:"type,omitempty"`
	UnusedOrganizationID platform.ID            `json:"orgID,omitempty"`
	UnusedOrganization   string                 `json:"org,omitempty"`
	OwnerUsername        string                 `json:"ownerID"`
	Name                 string                 `json:"name"`
	Description          string                 `json:"description,omitempty"`
	Status               string                 `json:"status"`
	Flux                 string                 `json:"flux"`
	Every                string                 `json:"every,omitempty"`
	Cron                 string                 `json:"cron,omitempty"`
	Offset               time.Duration          `json:"offset,omitempty"`
	LatestCompleted      time.Time              `json:"latestCompleted,omitempty"`
	LatestScheduled      time.Time              `json:"latestScheduled,omitempty"`
	LatestSuccess        time.Time              `json:"latestSuccess,omitempty"`
	LatestFailure        time.Time              `json:"latestFailure,omitempty"`
	LastRunStatus        string                 `json:"lastRunStatus,omitempty"`
	LastRunError         string                 `json:"lastRunError,omitempty"`
	CreatedAt            time.Time              `json:"createdAt,omitempty"`
	UpdatedAt            time.Time              `json:"updatedAt,omitempty"`
	Metadata             map[string]interface{} `json:"metadata,omitempty"`
}

Task is a task. 🎊

func (*Task) EffectiveCron

func (t *Task) EffectiveCron() string

EffectiveCron returns the effective cron string of the options. If the cron option was specified, it is returned. If the every option was specified, it is converted into a cron string using "@every". Otherwise, the empty string is returned. The value of the offset option is not considered.

type TaskCreate

type TaskCreate struct {
	Type                 string                 `json:"type,omitempty"`
	Flux                 string                 `json:"flux"`
	Description          string                 `json:"description,omitempty"`
	Status               string                 `json:"status,omitempty"`
	UnusedOrganizationID platform.ID            `json:"orgID,omitempty"`
	UnusedOrganization   string                 `json:"org,omitempty"`
	OwnerUsername        string                 `json:"-"`
	Metadata             map[string]interface{} `json:"-"` // not to be set through a web request but rather used by a http service using tasks backend.
}

TaskCreate is the set of values to create a task.

func (TaskCreate) Validate

func (t TaskCreate) Validate() error

type TaskFilter

type TaskFilter struct {
	Type     *string
	Name     *string
	After    *platform.ID
	Username *string
	Limit    int
	Status   *string
}

TaskFilter represents a set of filters that restrict the returned results

func (TaskFilter) QueryParams

func (f TaskFilter) QueryParams() map[string][]string

QueryParams Converts TaskFilter fields to url query params.

type TaskService

type TaskService interface {
	// FindTaskByID returns a single task
	FindTaskByID(ctx context.Context, id platform.ID) (*Task, error)

	// FindTasks returns a list of tasks that match a filter (limit 100) and the total count
	// of matching tasks.
	FindTasks(ctx context.Context, filter TaskFilter) ([]*Task, int, error)

	// CreateTask creates a new task.
	// The owner of the task is inferred from the authorizer associated with ctx.
	CreateTask(ctx context.Context, t TaskCreate) (*Task, error)

	// UpdateTask updates a single task with changeset.
	UpdateTask(ctx context.Context, id platform.ID, upd TaskUpdate) (*Task, error)

	// DeleteTask removes a task by ID and purges all associated data and scheduled runs.
	DeleteTask(ctx context.Context, id platform.ID) error

	// FindLogs returns logs for a run.
	FindLogs(ctx context.Context, filter LogFilter) ([]*Log, int, error)

	// FindRuns returns a list of runs that match a filter and the total count of returned runs.
	FindRuns(ctx context.Context, filter RunFilter) ([]*Run, int, error)

	// FindRunByID returns a single run.
	FindRunByID(ctx context.Context, taskID, runID platform.ID) (*Run, error)

	// CancelRun cancels a currently running run.
	CancelRun(ctx context.Context, taskID, runID platform.ID) error

	// RetryRun creates and returns a new run (which is a retry of another run).
	RetryRun(ctx context.Context, taskID, runID platform.ID) (*Run, error)

	// ForceRun forces a run to occur with unix timestamp scheduledFor, to be executed as soon as possible.
	// The value of scheduledFor may or may not align with the task's schedule.
	ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*Run, error)
}

TaskService represents a service for managing one-off and recurring tasks.

type TaskStatus

type TaskStatus string
const (
	TaskActive   TaskStatus = "active"
	TaskInactive TaskStatus = "inactive"

	DefaultTaskStatus TaskStatus = TaskActive
)

type TaskUpdate

type TaskUpdate struct {
	Flux        *string `json:"flux,omitempty"`
	Status      *string `json:"status,omitempty"`
	Description *string `json:"description,omitempty"`

	// LatestCompleted us to set latest completed on startup to skip task catchup
	LatestCompleted *time.Time             `json:"-"`
	LatestScheduled *time.Time             `json:"-"`
	LatestSuccess   *time.Time             `json:"-"`
	LatestFailure   *time.Time             `json:"-"`
	LastRunStatus   *string                `json:"-"`
	LastRunError    *string                `json:"-"`
	Metadata        map[string]interface{} `json:"-"` // not to be set through a web request but rather used by a http service using tasks backend.

	// Options gets unmarshalled from json as if it was flat, with the same level as Flux and Status.
	Options options.Options // when we unmarshal this gets unmarshalled from flat key-values
}

TaskUpdate represents updates to a task. Options updates override any options set in the Flux field.

func (*TaskUpdate) MarshalJSON

func (t *TaskUpdate) MarshalJSON() ([]byte, error)

func (*TaskUpdate) UnmarshalJSON

func (t *TaskUpdate) UnmarshalJSON(data []byte) error

func (*TaskUpdate) UpdateFlux

func (t *TaskUpdate) UpdateFlux(oldFlux string) error

UpdateFlux updates the TaskUpdate to go from updating options to updating a flux string, that now has those updated options in it. It zeros the options in the TaskUpdate.

func (*TaskUpdate) Validate

func (t *TaskUpdate) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL