Documentation ¶
Index ¶
- Constants
- Variables
- func HumanReadableResults(results []reflect.Value) string
- func IsContextType(t reflect.Type) bool
- func IsUnrecoverable(err error) bool
- func ReflectTaskResults(taskResults []*TaskResult) ([]reflect.Value, error)
- func ReflectValue(valueType string, value interface{}) (reflect.Value, error)
- func UnpackUnrecoverable(err error) (isUnrecoverable bool, werr error)
- func Unrecoverable(err error) error
- func ValidateTask(task interface{}) error
- type Arg
- type Chain
- type Chord
- type ErrRetryTaskLater
- type ErrUnrecoverable
- type ErrUnsupportedType
- type Group
- type GroupMeta
- type Headers
- type Retriable
- type Signature
- type Task
- type TaskResult
- type TaskState
- func NewFailureTaskState(signature *Signature, err string) *TaskState
- func NewPendingTaskState(signature *Signature) *TaskState
- func NewReceivedTaskState(signature *Signature) *TaskState
- func NewRetryTaskState(signature *Signature) *TaskState
- func NewStartedTaskState(signature *Signature) *TaskState
- func NewSuccessTaskState(signature *Signature, results []*TaskResult) *TaskState
Constants ¶
const ( // StatePending - initial state of a task StatePending = "PENDING" // StateReceived - when task is received by a worker StateReceived = "RECEIVED" // StateStarted - when the worker starts processing the task StateStarted = "STARTED" // StateRetry - when failed task has been scheduled for retry StateRetry = "RETRY" // StateSuccess - when the task is processed successfully StateSuccess = "SUCCESS" // StateFailure - when processing of the task fails StateFailure = "FAILURE" )
Variables ¶
var ( // ErrTaskMustBeFunc ... ErrTaskMustBeFunc = errors.New("Task must be a func type") // ErrTaskReturnsNoValue ... ErrTaskReturnsNoValue = errors.New("Task must return at least a single value") // ErrLastReturnValueMustBeError .. ErrLastReturnValueMustBeError = errors.New("Last return value of a task must be error") )
var ErrTaskPanicked = errors.New("Invoking task caused a panic")
ErrTaskPanicked ...
Functions ¶
func HumanReadableResults ¶ added in v1.0.10
HumanReadableResults ...
func IsContextType ¶
IsContextType checks to see if the type is a context.Context
func IsUnrecoverable ¶ added in v1.10.9
IsUnrecoverable checks if error is an instance of `ErrUnrecoverable`
func ReflectTaskResults ¶ added in v1.0.10
func ReflectTaskResults(taskResults []*TaskResult) ([]reflect.Value, error)
ReflectTaskResults ...
func ReflectValue ¶
ReflectValue converts interface{} to reflect.Value based on string type
func UnpackUnrecoverable ¶ added in v1.10.9
UnpackUnrecoverable checks if err is unrecoverable and returns the wrapped error.
func Unrecoverable ¶ added in v1.10.9
Unrecoverable wraps an error in `ErrUnrecoverable` struct
func ValidateTask ¶
func ValidateTask(task interface{}) error
ValidateTask validates task function using reflection and makes sure it has a proper signature. Functions used as tasks must return at least a single value and the last return type must be error
Types ¶
type Arg ¶
type Arg struct { Name string `bson:"name"` Type string `bson:"type"` Value interface{} `bson:"value"` }
Arg represents a single argument passed to invocation fo a task
type Chain ¶
type Chain struct {
Tasks []*Signature
}
Chain creates a chain of tasks to be executed one after another
type Chord ¶
Chord adds an optional callback to the group to be executed after all tasks in the group finished
type ErrRetryTaskLater ¶ added in v1.2.1
type ErrRetryTaskLater struct {
// contains filtered or unexported fields
}
ErrRetryTaskLater ...
func NewErrRetryTaskLater ¶ added in v1.2.1
func NewErrRetryTaskLater(msg string, retryIn time.Duration) ErrRetryTaskLater
NewErrRetryTaskLater returns new ErrRetryTaskLater instance
func (ErrRetryTaskLater) Error ¶ added in v1.2.1
func (e ErrRetryTaskLater) Error() string
Error implements the error interface
func (ErrRetryTaskLater) RetryIn ¶ added in v1.2.1
func (e ErrRetryTaskLater) RetryIn() time.Duration
RetryIn returns time.Duration from now when task should be retried
type ErrUnrecoverable ¶ added in v1.10.9
type ErrUnrecoverable struct {
// contains filtered or unexported fields
}
ErrUnrecoverable is an unrecoverable error, ie. the task will not be retried.
type ErrUnsupportedType ¶
type ErrUnsupportedType struct {
// contains filtered or unexported fields
}
ErrUnsupportedType ...
func NewErrUnsupportedType ¶
func NewErrUnsupportedType(valueType string) ErrUnsupportedType
NewErrUnsupportedType returns new ErrUnsupportedType
func (ErrUnsupportedType) Error ¶
func (e ErrUnsupportedType) Error() string
Error method so we implement the error interface
type Group ¶
Group creates a set of tasks to be executed in parallel
type GroupMeta ¶
type GroupMeta struct { GroupUUID string `bson:"_id"` TaskUUIDs []string `bson:"task_uuids"` ChordTriggered bool `bson:"chord_triggered"` Lock bool `bson:"lock"` CreatedAt time.Time `bson:"created_at"` TTL int64 `bson:"ttl,omitempty"` }
GroupMeta stores useful metadata about tasks within the same group E.g. UUIDs of all tasks which are used in order to check if all tasks completed successfully or not and thus whether to trigger chord callback
type Headers ¶
type Headers map[string]interface{}
Headers represents the headers which should be used to direct the task
func (Headers) ForeachKey ¶ added in v1.2.2
ForeachKey on Headers implements opentracing.TextMapReader for trace propagation. It is essentially the same as the opentracing.TextMapReader implementation except for the added casting from interface{} to string.
type Signature ¶
type Signature struct { UUID string Name string RoutingKey string ETA *time.Time GroupUUID string GroupTaskCount int Args []Arg Headers Headers Priority uint8 Immutable bool RetryCount int RetryTimeout int OnSuccess []*Signature OnError []*Signature ChordCallback *Signature //MessageGroupId for Broker, e.g. SQS BrokerMessageGroupId string //ReceiptHandle of SQS Message SQSReceiptHandle string // StopTaskDeletionOnError used with sqs when we want to send failed messages to dlq, // and don't want machinery to delete from source queue StopTaskDeletionOnError bool // IgnoreWhenTaskNotRegistered auto removes the request when there is no handeler available // When this is true a task with no handler will be ignored and not placed back in the queue IgnoreWhenTaskNotRegistered bool }
Signature represents a single task invocation
func CopySignature ¶ added in v1.10.9
func CopySignatures ¶ added in v1.10.9
func NewSignature ¶
NewSignature creates a new task signature
func SignatureFromContext ¶ added in v1.10.9
SignatureFromContext gets the signature from the context
type Task ¶
type Task struct { TaskFunc reflect.Value UseContext bool Context context.Context Args []reflect.Value }
Task wraps a signature and methods used to reflect task arguments and return values after invoking the task
func New ¶
New tries to use reflection to convert the function and arguments into a reflect.Value and prepare it for invocation
func NewWithSignature ¶ added in v1.10.9
NewWithSignature is the same as New but injects the signature
func (*Task) Call ¶
func (t *Task) Call() (taskResults []*TaskResult, err error)
Call attempts to call the task with the supplied arguments.
`err` is set in the return value in two cases:
- The reflected function invocation panics (e.g. due to a mismatched argument list).
- The task func itself returns a non-nil error.
func (*Task) ReflectArgs ¶
ReflectArgs converts []TaskArg to []reflect.Value
type TaskResult ¶
type TaskResult struct { Type string `bson:"type"` Value interface{} `bson:"value"` }
TaskResult represents an actual return value of a processed task
type TaskState ¶
type TaskState struct { TaskUUID string `bson:"_id"` TaskName string `bson:"task_name"` State string `bson:"state"` Results []*TaskResult `bson:"results"` Error string `bson:"error"` CreatedAt time.Time `bson:"created_at"` TTL int64 `bson:"ttl,omitempty"` }
TaskState represents a state of a task
func NewFailureTaskState ¶
NewFailureTaskState ...
func NewPendingTaskState ¶
NewPendingTaskState ...
func NewReceivedTaskState ¶
NewReceivedTaskState ...
func NewRetryTaskState ¶
NewRetryTaskState ...
func NewStartedTaskState ¶
NewStartedTaskState ...
func NewSuccessTaskState ¶
func NewSuccessTaskState(signature *Signature, results []*TaskResult) *TaskState
NewSuccessTaskState ...
func (*TaskState) IsCompleted ¶
IsCompleted returns true if state is SUCCESS or FAILURE, i.e. the task has finished processing and either succeeded or failed.