Documentation
¶
Index ¶
- Variables
- type AlreadyExists
- type Args
- type AttemptedTimes
- type ClaimedAt
- type CompletedAt
- type Context
- type EnqueuedAt
- type Failure
- type Id
- type InvalidPersistedData
- type InvalidVersion
- type JsonObj
- type Kind
- type LastClaimed
- type MockTasksService
- func (m *MockTasksService) ArchiveOldTasks(ctx context.Context, archiveCompletedBefore CompletedAt, scrollSize uint, ...) error
- func (m *MockTasksService) Claim(ctx context.Context, workerId worker.Id, queues []queue.Name, number uint, ...) ([]Task, error)
- func (m *MockTasksService) Create(ctx context.Context, task *NewTask) (*Task, error)
- func (m *MockTasksService) Get(ctx context.Context, queue queue.Name, taskId Id) (*Task, error)
- func (m *MockTasksService) MarkDone(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id, ...) (*Task, error)
- func (m *MockTasksService) MarkFailed(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id, ...) (*Task, error)
- func (m *MockTasksService) OutstandingTasksCount(ctx context.Context, name queue.Name, id RecurringTaskId) (uint, error)
- func (m *MockTasksService) ReapTimedOutTasks(ctx context.Context, scrollSize uint, scrollTtl time.Duration) error
- func (m *MockTasksService) RefreshAsNeeded(ctx context.Context, name queue.Name) error
- func (m *MockTasksService) ReportIn(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id, ...) (*Task, error)
- func (m *MockTasksService) UnClaim(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id) (*Task, error)
- type NewReport
- type NewTask
- type NotClaimed
- type NotFound
- type NotOwnedByWorker
- type Priority
- type ProcessingTimeout
- type RecurringTaskId
- type RemainingAttempts
- type Report
- type ReportFromThePast
- type ReportedAt
- type ReportedData
- type Result
- type RetryTimes
- type RunAt
- type Service
- type ServiceErr
- type State
- type Success
- type Task
- func (t *Task) IntoClaimed(workerId worker.Id, at ClaimedAt) error
- func (t *Task) IntoDone(byWorker worker.Id, at CompletedAt, success *Success) error
- func (t *Task) IntoFailed(byWorker worker.Id, at CompletedAt, failure *Failure) error
- func (t *Task) IntoUnClaimed(byWorkerId worker.Id, at EnqueuedAt) error
- func (t *Task) RemainingAttempts() RemainingAttempts
- func (t *Task) ReportIn(byWorker worker.Id, newReport NewReport, at ReportedAt) error
- type TimesOutAt
- type Unclaimable
- type WrappingErr
Constants ¶
This section is empty.
Variables ¶
var MockDomainTask = Task{
ID: "mock",
Queue: "q",
}
Functions ¶
This section is empty.
Types ¶
type AlreadyExists ¶
type AlreadyExists struct {
ID Id
}
AlreadyExists is returned when the service tries to create a Task, but there already exists one with the same ID
func (AlreadyExists) Error ¶
func (e AlreadyExists) Error() string
func (AlreadyExists) Id ¶
func (e AlreadyExists) Id() Id
type CompletedAt ¶
type Context ¶
type Context JsonObj
Task context, useful if there are extra context things needed to be passed
type Id ¶
type Id string
Id for a task that has been persisted
func GenerateId ¶
Generates a random id if the NewTask passed does not have one
type InvalidPersistedData ¶
type InvalidPersistedData struct {
PersistedData interface{}
}
Invalid data
func (InvalidPersistedData) Error ¶
func (e InvalidPersistedData) Error() string
type InvalidVersion ¶
type InvalidVersion struct {
ID Id
}
Invalid version returned when the version is invalid
func (InvalidVersion) Error ¶
func (e InvalidVersion) Error() string
func (InvalidVersion) Id ¶
func (e InvalidVersion) Id() Id
type LastClaimed ¶
type MockTasksService ¶
type MockTasksService struct { CreateCalled uint CreateOverride func() (*Task, error) GetCalled uint GetOverride func() (*Task, error) ClaimCalled uint ClaimOverride func() ([]Task, error) ReportInCalled uint ReportInOverride func() (*Task, error) MarkDoneCalled uint MarkDoneOverride func() (*Task, error) MarkFailedCalled uint MarkFailedOverride func() (*Task, error) UnClaimCalled uint UnClaimOverride func() (*Task, error) ReapTimedOutCalled uint ReapTimedOutOverride func() error ArchiveOldTasksCalled uint ArchiveOldTasksOverride func() error RefreshAsNeededCalled uint RefreshAsNeededOverride func() error OutstandingTasksCountCalled uint OutstandingTasksCountOverride func() (uint, error) }
func (*MockTasksService) ArchiveOldTasks ¶
func (m *MockTasksService) ArchiveOldTasks(ctx context.Context, archiveCompletedBefore CompletedAt, scrollSize uint, scrollTtl time.Duration) error
func (*MockTasksService) MarkFailed ¶
func (*MockTasksService) OutstandingTasksCount ¶
func (m *MockTasksService) OutstandingTasksCount(ctx context.Context, name queue.Name, id RecurringTaskId) (uint, error)
func (*MockTasksService) ReapTimedOutTasks ¶
func (*MockTasksService) RefreshAsNeeded ¶
type NewReport ¶
type NewReport struct {
Data *ReportedData
}
A Report to be filed on a Job as sent in from the owning Worker
type NewTask ¶
type NewTask struct { // Optional id key for idempotency (if not supplied, a generated UUID is used as task Id) Id *Id Queue queue.Name RetryTimes RetryTimes Kind Kind Priority Priority RunAt RunAt ProcessingTimeout ProcessingTimeout Args *Args Context *Context // This is the id of the recurring Task that spawned this Task. Only populated if created by a Recurring Task RecurringTaskId *RecurringTaskId }
A Task that has yet to be created
type NotClaimed ¶
func (NotClaimed) Error ¶
func (a NotClaimed) Error() string
func (NotClaimed) Id ¶
func (a NotClaimed) Id() Id
type NotOwnedByWorker ¶
func (NotOwnedByWorker) Error ¶
func (n NotOwnedByWorker) Error() string
func (NotOwnedByWorker) Id ¶
func (n NotOwnedByWorker) Id() Id
type ProcessingTimeout ¶
type RecurringTaskId ¶
type RecurringTaskId string
A user-specifiable Recurring Task Id
Here because Golang doesn't let me put it the recurring package due to circular import...
type RemainingAttempts ¶
type RemainingAttempts uint
Number of times that a Task can still be retried
type Report ¶
type Report struct { At ReportedAt Data *ReportedData }
A Report on a Job as sent in from the owning Worker
type ReportFromThePast ¶
func (ReportFromThePast) Error ¶
func (a ReportFromThePast) Error() string
func (ReportFromThePast) Id ¶
func (a ReportFromThePast) Id() Id
type ReportedAt ¶
type ReportedData ¶
type ReportedData JsonObj
type Result ¶
type Result struct { At CompletedAt // Results. Only one of the following will be filled in at a given time Failure *Failure Success *Success }
type Service ¶
type Service interface { // Persists the given NewTask. Create(ctx context.Context, task *NewTask) (*Task, error) // Retrieves a Task by Id, returns an empty task with error if: // - No such task exists Get(ctx context.Context, queue queue.Name, taskId Id) (*Task, error) // Attempts to claim a number of Tasks for a given Worker, by worker.Id, optionally // blocking for the given time if the requested amount cannot be found. // // Returns immediately if the specified amount can be found, otherwise, retries until // the time limit, returning what it was able to claim. // // Also returns immediately if there are any errors. Claim(ctx context.Context, workerId worker.Id, queues []queue.Name, number uint, blockFor time.Duration) ([]Task, error) // Reports in on a given Task. // // Errors out if the Task // 1. Does not exist in the queue // 2. Is not claimed (possibly requeued) // 3. Is not claimed by the given worker as identified by id. // 4. Has been updated at a later time (concurrency) ReportIn(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id, newReport NewReport) (*Task, error) // Marks a Task as successfully completed. // // Errors out if the Task // 1. Does not exist in the queue // 2. Is not claimed (possibly requeued) // 3. Is not claimed by the given worker as identified by id. MarkDone(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id, success *Success) (*Task, error) // Marks a Task as failed. // // Errors out if the Task // 1. Does not exist in the queue // 2. Is not claimed (possibly requeued) // 3. Is not claimed by the given worker as identified by id. MarkFailed(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id, failure *Failure) (*Task, error) // Unclaims a Task so that it can be claimed by someone else. // // Note that we decrement attempts and reset the State, but do not unset LastClaimed; when the Task is // next Claimed, this field will be updated. // // Note that nothing else about the Task is modified. This allows callers to re-queue // a task that they have claimed but no longer wantedLastClaimed to / can handle. // // Errors out if the Task // 1. Does not exist in the queue // 2. Is not claimed (possibly requeued) // 3. Is not claimed by the given worker as identified by id. UnClaim(ctx context.Context, workerId worker.Id, queue queue.Name, taskId Id) (*Task, error) // This sets all Claimed Tasks that have timed out to Failed, adjusting RunAt as needed // // Note that this method is meant to be idempotent, so errors can be ignored or handled by simply logging ReapTimedOutTasks(ctx context.Context, scrollSize uint, scrollTtl time.Duration) error // This sets all Claimed Tasks that have timed out to Failed // // Note that this method is meant to be idempotent, so errors can be ignored or handled by simply logging ArchiveOldTasks(ctx context.Context, archiveCompletedBefore CompletedAt, scrollSize uint, scrollTtl time.Duration) error // This refreshes a given Queue so that the next time an op is carried out on it, the server is guaranteed // to get the latest information on the Tasks that are in that Queue. // // Internally, an attempt is made to _not_ refresh if the last search or refresh was carried out on the given // Queue within a given configurable time frame *by this process*. This is a "best effort" attempt to reduce // the strain on the server, but can be improved later if the count is shared somehow. RefreshAsNeeded(ctx context.Context, queue queue.Name) error // Counts the number of outstanding Tasks in the given Queue that belong to a given RecurringTaskId // // Outstanding means not DONE or DEAD that are runnable OutstandingTasksCount(ctx context.Context, queue queue.Name, recurringTaskId RecurringTaskId) (uint, error) }
A Service that takes care of the persistence of Tasks.
type ServiceErr ¶
ServiceErr is an error interface for Service
type State ¶
type State uint8
Task state boilerplate galore The state of a Task that can be marshalled to and from JSON
func (State) MarshalJSON ¶
MarshalJSON marshals the enum as a quoted json string
func (*State) UnmarshalJSON ¶
UnmarshalJSON unmashals a quoted json string to the enum value
type Task ¶
type Task struct { ID Id Queue queue.Name RetryTimes RetryTimes Attempted AttemptedTimes Kind Kind State State Priority Priority RunAt RunAt ProcessingTimeout ProcessingTimeout Args *Args Context *Context LastClaimed *LastClaimed LastEnqueuedAt EnqueuedAt Metadata metadata.Metadata // This is the id of the recurring Task that spawned this Task. Only populated if created by a Recurring Task RecurringTaskId *RecurringTaskId }
A Task that has already been persisted a Task is identified uniquely by its ID and Queue and versioned according to its Metadata Version
func (*Task) IntoClaimed ¶
IntoClaimed marks a task as claimed.
Note that it does no error checking (e.g. making sure the task is in the right status), because this is an internal method that is called directly after a search for claimable tasks in the ES tasks service implementation.
Returns an error if the Task is not currently in a claimable state
func (*Task) IntoDone ¶
IntoDone marks the current task as done, attaching the given Success data
It does some checks to make sure, for instance, that the Task has the right status and currently belongs to the given worker id
func (*Task) IntoFailed ¶
IntoFailed registers the current Task as something that has failed.
If the Task can be retried, update its RunAt exponentially and mark it as FAILED If it cannot be retried, we mark it as DEAD
It does some checks to make sure, for instance, that the Task has the right status and belongs to the given worker id
func (*Task) IntoUnClaimed ¶
func (t *Task) IntoUnClaimed(byWorkerId worker.Id, at EnqueuedAt) error
Marks the current task as UnClaimed
Decrements the "attempted" field, and requeues the Task.
It does some checks to make sure, for instance, that the Task has the right status and belongs to the given worker id
func (*Task) RemainingAttempts ¶
func (t *Task) RemainingAttempts() RemainingAttempts
RemainingAttempts returns the number of times this task can still run before any further updates will cause it to be marked as dead
type TimesOutAt ¶
type Unclaimable ¶
func (Unclaimable) Error ¶
func (a Unclaimable) Error() string
func (Unclaimable) Id ¶
func (a Unclaimable) Id() Id