Documentation ¶
Index ¶
- Constants
- Variables
- func HumanReadableResults(results []reflect.Value) string
- func IsContextType(t reflect.Type) bool
- func ReflectTaskResults(taskResults []*TaskResult) ([]reflect.Value, error)
- func ReflectValue(valueType string, value interface{}) (reflect.Value, error)
- func ValidateTask(task interface{}) error
- type Arg
- type Chain
- type Chord
- type ErrRetryTaskLater
- type ErrUnsupportedType
- type Group
- type GroupMeta
- type Headers
- type Retriable
- type Signature
- type Task
- type TaskResult
- type TaskState
- func NewFailureTaskState(signature *Signature, err string) *TaskState
- func NewPendingTaskState(signature *Signature) *TaskState
- func NewReceivedTaskState(signature *Signature) *TaskState
- func NewRetryTaskState(signature *Signature) *TaskState
- func NewStartedTaskState(signature *Signature) *TaskState
- func NewSuccessTaskState(signature *Signature, results []*TaskResult) *TaskState
Constants ¶
const ( // StatePending - initial state of a task StatePending = "PENDING" // StateReceived - when task is received by a worker StateReceived = "RECEIVED" // StateStarted - when the worker starts processing the task StateStarted = "STARTED" // StateRetry - when failed task has been scheduled for retry StateRetry = "RETRY" // StateSuccess - when the task is processed successfully StateSuccess = "SUCCESS" // StateFailure - when processing of the task fails StateFailure = "FAILURE" )
Variables ¶
var ( // ErrTaskMustBeFunc ... ErrTaskMustBeFunc = errors.New("Task must be a func type") // ErrTaskReturnsNoValue ... ErrTaskReturnsNoValue = errors.New("Task must return at least a single value") // ErrLastReturnValueMustBeError .. ErrLastReturnValueMustBeError = errors.New("Last return value of a task must be error") )
var ErrTaskPanicked = errors.New("Invoking task caused a panic")
ErrTaskPanicked ...
Functions ¶
func HumanReadableResults ¶ added in v1.0.10
HumanReadableResults ...
func IsContextType ¶
IsContextType checks to see if the type is a context.Context
func ReflectTaskResults ¶ added in v1.0.10
func ReflectTaskResults(taskResults []*TaskResult) ([]reflect.Value, error)
ReflectTaskResults ...
func ReflectValue ¶
ReflectValue converts interface{} to reflect.Value based on string type
func ValidateTask ¶
func ValidateTask(task interface{}) error
ValidateTask validates task function using reflection and makes sure it has a proper signature. Functions used as tasks must return at least a single value and the last return type must be error
Types ¶
type Arg ¶
type Arg struct { Name string `bson:"name"` Type string `bson:"type"` Value interface{} `bson:"value"` }
Arg represents a single argument passed to invocation fo a task
type Chain ¶
type Chain struct {
Tasks []*Signature
}
Chain creates a chain of tasks to be executed one after another
type Chord ¶
Chord adds an optional callback to the group to be executed after all tasks in the group finished
type ErrRetryTaskLater ¶ added in v1.2.1
type ErrRetryTaskLater struct {
// contains filtered or unexported fields
}
ErrRetryTaskLater ...
func NewErrRetryTaskLater ¶ added in v1.2.1
func NewErrRetryTaskLater(msg string, retryIn time.Duration) ErrRetryTaskLater
NewErrRetryTaskLater returns new ErrRetryTaskLater instance
func (ErrRetryTaskLater) Error ¶ added in v1.2.1
func (e ErrRetryTaskLater) Error() string
Error implements the error interface
func (ErrRetryTaskLater) RetryIn ¶ added in v1.2.1
func (e ErrRetryTaskLater) RetryIn() time.Duration
RetryIn returns time.Duration from now when task should be retried
type ErrUnsupportedType ¶
type ErrUnsupportedType struct {
// contains filtered or unexported fields
}
ErrUnsupportedType ...
func NewErrUnsupportedType ¶
func NewErrUnsupportedType(valueType string) ErrUnsupportedType
NewErrUnsupportedType returns new ErrUnsupportedType
func (ErrUnsupportedType) Error ¶
func (e ErrUnsupportedType) Error() string
Error method so we implement the error interface
type Group ¶
Group creates a set of tasks to be executed in parallel
type GroupMeta ¶
type GroupMeta struct { GroupUUID string `bson:"_id"` TaskUUIDs []string `bson:"task_uuids"` ChordTriggered bool `bson:"chord_triggered"` Lock bool `bson:"lock"` CreatedAt time.Time `bson:"created_at"` }
GroupMeta stores useful metadata about tasks within the same group E.g. UUIDs of all tasks which are used in order to check if all tasks completed successfully or not and thus whether to trigger chord callback
type Headers ¶
type Headers map[string]interface{}
Headers represents the headers which should be used to direct the task
func (Headers) ForeachKey ¶ added in v1.2.2
ForeachKey on Headers implements opentracing.TextMapReader for trace propagation. It is essentially the same as the opentracing.TextMapReader implementation except for the added casting from interface{} to string.
type Signature ¶
type Signature struct { UUID string Name string RoutingKey string ETA *time.Time GroupUUID string GroupTaskCount int Args []Arg Headers Headers Immutable bool RetryCount int RetryTimeout int OnSuccess []*Signature OnError []*Signature ChordCallback *Signature //MessageGroupId for Broker, e.g. SQS BrokerMessageGroupId string }
Signature represents a single task invocation
type Task ¶
type Task struct { TaskFunc reflect.Value UseContext bool Context context.Context Args []reflect.Value }
Task wraps a signature and methods used to reflect task arguments and return values after invoking the task
func New ¶
New tries to use reflection to convert the function and arguments into a reflect.Value and prepare it for invocation
func (*Task) Call ¶
func (t *Task) Call() (taskResults []*TaskResult, err error)
Call attempts to call the task with the supplied arguments.
`err` is set in the return value in two cases:
- The reflected function invocation panics (e.g. due to a mismatched argument list).
- The task func itself returns a non-nil error.
func (*Task) ReflectArgs ¶
ReflectArgs converts []TaskArg to []reflect.Value
type TaskResult ¶
type TaskResult struct { Type string `bson:"type"` Value interface{} `bson:"value"` }
TaskResult represents an actual return value of a processed task
type TaskState ¶
type TaskState struct { TaskUUID string `bson:"_id"` TaskName string `bson:"task_name"` State string `bson:"state"` Results []*TaskResult `bson:"results"` Error string `bson:"error"` CreatedAt time.Time `bson:"created_at"` }
TaskState represents a state of a task
func NewFailureTaskState ¶
NewFailureTaskState ...
func NewPendingTaskState ¶
NewPendingTaskState ...
func NewReceivedTaskState ¶
NewReceivedTaskState ...
func NewRetryTaskState ¶
NewRetryTaskState ...
func NewStartedTaskState ¶
NewStartedTaskState ...
func NewSuccessTaskState ¶
func NewSuccessTaskState(signature *Signature, results []*TaskResult) *TaskState
NewSuccessTaskState ...
func (*TaskState) IsCompleted ¶
IsCompleted returns true if state is SUCCESS or FAILURE, i.e. the task has finished processing and either succeeded or failed.