task

package
v0.0.0-...-83a686f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2020 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var MockDomainTask = Task{
	ID:    "mock",
	Queue: "q",
}

Functions

This section is empty.

Types

type AlreadyExists

type AlreadyExists struct {
	ID Id
}

AlreadyExists is returned when the service tries to create a Task, but there already exists one with the same ID

func (AlreadyExists) Error

func (e AlreadyExists) Error() string

func (AlreadyExists) Id

func (e AlreadyExists) Id() Id

type Args

type Args JsonObj

Task arguments, corresponds to the arguments of a function

type AttemptedTimes

type AttemptedTimes uint

Number of times that a Task has been retried

type ClaimedAt

type ClaimedAt time.Time

type CompletedAt

type CompletedAt time.Time

type Context

type Context JsonObj

Task context, useful if there are extra context things needed to be passed

type EnqueuedAt

type EnqueuedAt time.Time

When something was last put into the queue

type Failure

type Failure JsonObj

type Id

type Id string

Id for a task that has been persisted

func GenerateId

func GenerateId(task *NewTask) Id

Generates a random id if the NewTask passed does not have one

func GenerateNewId

func GenerateNewId() Id

Generates a random id

type InvalidPersistedData

type InvalidPersistedData struct {
	PersistedData interface{}
}

Invalid data

func (InvalidPersistedData) Error

func (e InvalidPersistedData) Error() string

type InvalidVersion

type InvalidVersion struct {
	ID Id
}

Invalid version returned when the version is invalid

func (InvalidVersion) Error

func (e InvalidVersion) Error() string

func (InvalidVersion) Id

func (e InvalidVersion) Id() Id

type JsonObj

type JsonObj map[string]interface{}

type Kind

type Kind string

Roughly corresponds to a function name

type LastClaimed

type LastClaimed struct {
	WorkerId   worker.Id
	ClaimedAt  ClaimedAt
	TimesOutAt TimesOutAt
	LastReport *Report
	Result     *Result
}

type MockTasksService

type MockTasksService struct {
	CreateCalled                  uint
	CreateOverride                func() (*Task, error)
	GetCalled                     uint
	GetOverride                   func() (*Task, error)
	ClaimCalled                   uint
	ClaimOverride                 func() ([]Task, error)
	ReportInCalled                uint
	ReportInOverride              func() (*Task, error)
	MarkDoneCalled                uint
	MarkDoneOverride              func() (*Task, error)
	MarkFailedCalled              uint
	MarkFailedOverride            func() (*Task, error)
	UnClaimCalled                 uint
	UnClaimOverride               func() (*Task, error)
	ReapTimedOutCalled            uint
	ReapTimedOutOverride          func() error
	ArchiveOldTasksCalled         uint
	ArchiveOldTasksOverride       func() error
	RefreshAsNeededCalled         uint
	RefreshAsNeededOverride       func() error
	OutstandingTasksCountCalled   uint
	OutstandingTasksCountOverride func() (uint, error)
}

func (*MockTasksService) ArchiveOldTasks

func (m *MockTasksService) ArchiveOldTasks(ctx context.Context, archiveCompletedBefore CompletedAt, scrollSize uint, scrollTtl time.Duration) error

func (*MockTasksService) Claim

func (m *MockTasksService) Claim(ctx context.Context, workerId worker.Id, queues []queue.Name, number uint, blockFor time.Duration) ([]Task, error)

func (*MockTasksService) Create

func (m *MockTasksService) Create(ctx context.Context, task *NewTask) (*Task, error)

func (*MockTasksService) Get

func (m *MockTasksService) Get(ctx context.Context, queue queue.Name, taskId Id) (*Task, error)

func (*MockTasksService) MarkDone

func (m *MockTasksService) MarkDone(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id, success *Success) (*Task, error)

func (*MockTasksService) MarkFailed

func (m *MockTasksService) MarkFailed(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id, failure *Failure) (*Task, error)

func (*MockTasksService) OutstandingTasksCount

func (m *MockTasksService) OutstandingTasksCount(ctx context.Context, name queue.Name, id RecurringTaskId) (uint, error)

func (*MockTasksService) ReapTimedOutTasks

func (m *MockTasksService) ReapTimedOutTasks(ctx context.Context, scrollSize uint, scrollTtl time.Duration) error

func (*MockTasksService) RefreshAsNeeded

func (m *MockTasksService) RefreshAsNeeded(ctx context.Context, name queue.Name) error

func (*MockTasksService) ReportIn

func (m *MockTasksService) ReportIn(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id, newReport NewReport) (*Task, error)

func (*MockTasksService) UnClaim

func (m *MockTasksService) UnClaim(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id) (*Task, error)

type NewReport

type NewReport struct {
	Data *ReportedData
}

A Report to be filed on a Job as sent in from the owning Worker

type NewTask

type NewTask struct {
	// Optional id key for idempotency (if not supplied, a generated UUID is used as task Id)
	Id                *Id
	Queue             queue.Name
	RetryTimes        RetryTimes
	Kind              Kind
	Priority          Priority
	RunAt             RunAt
	ProcessingTimeout ProcessingTimeout
	Args              *Args
	Context           *Context

	// This is the id of the recurring Task that spawned this Task. Only populated if created by a Recurring Task
	RecurringTaskId *RecurringTaskId
}

A Task that has yet to be created

type NotClaimed

type NotClaimed struct {
	ID    Id
	State State
}

func (NotClaimed) Error

func (a NotClaimed) Error() string

func (NotClaimed) Id

func (a NotClaimed) Id() Id

type NotFound

type NotFound struct {
	ID        Id
	QueueName queue.Name
}

NotFound is returned when the repo cannot find a repo by a given Id

func (NotFound) Error

func (e NotFound) Error() string

func (NotFound) Id

func (e NotFound) Id() Id

type NotOwnedByWorker

type NotOwnedByWorker struct {
	ID                Id
	WantedOwnerWorker worker.Id
}

func (NotOwnedByWorker) Error

func (n NotOwnedByWorker) Error() string

func (NotOwnedByWorker) Id

func (n NotOwnedByWorker) Id() Id

type Priority

type Priority int

Priority of a task. 0 is neutral.

type ProcessingTimeout

type ProcessingTimeout time.Duration

type RecurringTaskId

type RecurringTaskId string

A user-specifiable Recurring Task Id

Here because Golang doesn't let me put it the recurring package due to circular import...

type RemainingAttempts

type RemainingAttempts uint

Number of times that a Task can still be retried

type Report

type Report struct {
	At   ReportedAt
	Data *ReportedData
}

A Report on a Job as sent in from the owning Worker

type ReportFromThePast

type ReportFromThePast struct {
	ID                  Id
	AttemptedReportTime time.Time
	ExistingReportTime  time.Time
}

func (ReportFromThePast) Error

func (a ReportFromThePast) Error() string

func (ReportFromThePast) Id

func (a ReportFromThePast) Id() Id

type ReportedAt

type ReportedAt time.Time

type ReportedData

type ReportedData JsonObj

type Result

type Result struct {
	At CompletedAt
	// Results. Only one of the following will be filled in at a given time
	Failure *Failure
	Success *Success
}

type RetryTimes

type RetryTimes uint

Number of times that a Task can be retried

type RunAt

type RunAt time.Time

When the Task should be run

type Service

type Service interface {
	// Persists the given NewTask.
	Create(ctx context.Context, task *NewTask) (*Task, error)
	// Retrieves a Task by Id, returns an empty task with error if:
	// - No such task exists
	Get(ctx context.Context, queue queue.Name, taskId Id) (*Task, error)

	// Attempts to claim a number of Tasks for a given Worker, by worker.Id, optionally
	// blocking for the given time if the requested amount cannot be found.
	//
	// Returns immediately if the specified amount can be found, otherwise, retries until
	// the time limit, returning what it was able to claim.
	//
	// Also returns immediately if there are any errors.
	Claim(ctx context.Context, workerId worker.Id, queues []queue.Name, number uint, blockFor time.Duration) ([]Task, error)

	// Reports in on a given Task.
	//
	// Errors out if the Task
	//  1. Does not exist in the queue
	//  2. Is not claimed (possibly requeued)
	//  3. Is not claimed by the given worker as identified by id.
	//  4. Has been updated at a later time (concurrency)
	ReportIn(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id, newReport NewReport) (*Task, error)

	// Marks a Task as successfully completed.
	//
	// Errors out if the Task
	//  1. Does not exist in the queue
	//  2. Is not claimed (possibly requeued)
	//  3. Is not claimed by the given worker as identified by id.
	MarkDone(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id, success *Success) (*Task, error)

	// Marks a Task as failed.
	//
	// Errors out if the Task
	//  1. Does not exist in the queue
	//  2. Is not claimed (possibly requeued)
	//  3. Is not claimed by the given worker as identified by id.
	MarkFailed(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id, failure *Failure) (*Task, error)

	// Unclaims a Task so that it can be claimed by someone else.
	//
	// Note that we decrement attempts and reset the State, but do not unset LastClaimed; when the Task is
	// next Claimed, this field will be updated.
	//
	// Note that nothing else about the Task is modified. This allows callers to re-queue
	// a task that they have claimed but no longer wantedLastClaimed to / can handle.
	//
	// Errors out if the Task
	//  1. Does not exist in the queue
	//  2. Is not claimed (possibly requeued)
	//  3. Is not claimed by the given worker as identified by id.
	UnClaim(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id) (*Task, error)

	// This sets all Claimed Tasks that have timed out to Failed, adjusting RunAt as needed
	//
	// Note that this method is meant to be idempotent, so errors can be ignored or handled by simply logging
	ReapTimedOutTasks(ctx context.Context, scrollSize uint, scrollTtl time.Duration) error

	// This sets all Claimed Tasks that have timed out to Failed
	//
	// Note that this method is meant to be idempotent, so errors can be ignored or handled by simply logging
	ArchiveOldTasks(ctx context.Context, archiveCompletedBefore CompletedAt, scrollSize uint, scrollTtl time.Duration) error

	// This refreshes a given Queue so that the next time an op is carried out on it, the server is guaranteed
	// to get the latest information on the Tasks that are in that Queue.
	//
	// Internally, an attempt is made to _not_ refresh if the last search or refresh was carried out on the given
	// Queue within a given configurable time frame *by this process*. This is a "best effort" attempt to reduce
	// the strain on the server, but can be improved later if the count is shared somehow.
	RefreshAsNeeded(ctx context.Context, queue queue.Name) error

	// Counts the number of outstanding Tasks in the given Queue that belong to a given RecurringTaskId
	//
	// Outstanding means not DONE or DEAD that are runnable
	OutstandingTasksCount(ctx context.Context, queue queue.Name, recurringTaskId RecurringTaskId) (uint, error)
}

A Service that takes care of the persistence of Tasks.

type ServiceErr

type ServiceErr interface {
	error
	Id() Id
}

ServiceErr is an error interface for Service

type State

type State uint8

Task state boilerplate galore The state of a Task that can be marshalled to and from JSON

const (
	QUEUED State = iota
	CLAIMED
	FAILED
	DONE
	DEAD
)

func (State) MarshalJSON

func (s State) MarshalJSON() ([]byte, error)

MarshalJSON marshals the enum as a quoted json string

func (State) String

func (s State) String() string

func (*State) UnmarshalJSON

func (s *State) UnmarshalJSON(b []byte) error

UnmarshalJSON unmashals a quoted json string to the enum value

type Success

type Success JsonObj

type Task

type Task struct {
	ID                Id
	Queue             queue.Name
	RetryTimes        RetryTimes
	Attempted         AttemptedTimes
	Kind              Kind
	State             State
	Priority          Priority
	RunAt             RunAt
	ProcessingTimeout ProcessingTimeout
	Args              *Args
	Context           *Context
	LastClaimed       *LastClaimed
	LastEnqueuedAt    EnqueuedAt
	Metadata          metadata.Metadata
	// This is the id of the recurring Task that spawned this Task. Only populated if created by a Recurring Task
	RecurringTaskId *RecurringTaskId
}

A Task that has already been persisted a Task is identified uniquely by its ID and Queue and versioned according to its Metadata Version

func (*Task) IntoClaimed

func (t *Task) IntoClaimed(workerId worker.Id, at ClaimedAt) error

IntoClaimed marks a task as claimed.

Note that it does no error checking (e.g. making sure the task is in the right status), because this is an internal method that is called directly after a search for claimable tasks in the ES tasks service implementation.

Returns an error if the Task is not currently in a claimable state

func (*Task) IntoDone

func (t *Task) IntoDone(byWorker worker.Id, at CompletedAt, success *Success) error

IntoDone marks the current task as done, attaching the given Success data

It does some checks to make sure, for instance, that the Task has the right status and currently belongs to the given worker id

func (*Task) IntoFailed

func (t *Task) IntoFailed(byWorker worker.Id, at CompletedAt, failure *Failure) error

IntoFailed registers the current Task as something that has failed.

If the Task can be retried, update its RunAt exponentially and mark it as FAILED If it cannot be retried, we mark it as DEAD

It does some checks to make sure, for instance, that the Task has the right status and belongs to the given worker id

func (*Task) IntoUnClaimed

func (t *Task) IntoUnClaimed(byWorkerId worker.Id, at EnqueuedAt) error

Marks the current task as UnClaimed

Decrements the "attempted" field, and requeues the Task.

It does some checks to make sure, for instance, that the Task has the right status and belongs to the given worker id

func (*Task) RemainingAttempts

func (t *Task) RemainingAttempts() RemainingAttempts

RemainingAttempts returns the number of times this task can still run before any further updates will cause it to be marked as dead

func (*Task) ReportIn

func (t *Task) ReportIn(byWorker worker.Id, newReport NewReport, at ReportedAt) error

ReportIn attaches the given NewReport to the current Task, at the given ReportedAt time.

It does some checks to make sure, for instance, that the Task has the right status and currently belongs to the given worker id

type TimesOutAt

type TimesOutAt time.Time

type Unclaimable

type Unclaimable struct {
	ID           Id
	CurrentState State
}

func (Unclaimable) Error

func (a Unclaimable) Error() string

func (Unclaimable) Id

func (a Unclaimable) Id() Id

type WrappingErr

type WrappingErr interface {
	error
	Unwrap() error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL