Documentation ¶
Index ¶
- Constants
- Variables
- func ColorWrite(w io.Writer, useColor bool, color []byte, s string, args ...interface{})
- func GetContextError(ctx context.Context) error
- func WithContextAfterRequestFunc(ctx context.Context, f AfterRequestFunc) context.Context
- func WithContextError(ctx context.Context, err error) context.Context
- func WithContextTask(ctx context.Context, task *Task) context.Context
- type AfterRequestFunc
- type Bokchoy
- func (b *Bokchoy) Empty(ctx context.Context) error
- func (b *Bokchoy) Flush() error
- func (b *Bokchoy) Handle(queueName string, sub Handler, options ...Option)
- func (b *Bokchoy) HandleFunc(queueName string, f HandlerFunc, options ...Option)
- func (b *Bokchoy) Publish(ctx context.Context, queueName string, payload interface{}, options ...Option) (*Task, error)
- func (b *Bokchoy) Queue(name string) *Queue
- func (b *Bokchoy) QueueNames() []string
- func (b *Bokchoy) Run(ctx context.Context, options ...Option) error
- func (b *Bokchoy) Stop(ctx context.Context)
- func (b *Bokchoy) Use(sub ...func(Handler) Handler) *Bokchoy
- type Broker
- type BrokerConfig
- type Config
- type Handler
- type HandlerFunc
- type JSONSerializer
- type Option
- func WithConcurrency(concurrency int) Option
- func WithCountdown(countdown time.Duration) Option
- func WithDisableOutput(disableOutput bool) Option
- func WithInitialize(initialize bool) Option
- func WithLogger(logger logging.Logger) Option
- func WithMaxRetries(maxRetries int) Option
- func WithQueues(queues []string) Option
- func WithRetryIntervals(retryIntervals []time.Duration) Option
- func WithSerializer(serializer Serializer) Option
- func WithTTL(ttl time.Duration) Option
- func WithTimeout(timeout time.Duration) Option
- func WithTracer(tracer Tracer) Option
- type Options
- type Queue
- func (q *Queue) Cancel(ctx context.Context, taskID string) (*Task, error)
- func (q *Queue) Consume(ctx context.Context) ([]*Task, error)
- func (q *Queue) ConsumeDelayed(ctx context.Context) ([]*Task, error)
- func (q *Queue) Consumer() *consumer
- func (q *Queue) Count(ctx context.Context) (QueueStats, error)
- func (q Queue) DelayName() string
- func (q *Queue) Empty(ctx context.Context) error
- func (q *Queue) Get(ctx context.Context, taskID string) (*Task, error)
- func (q *Queue) Handle(sub Handler, options ...Option) *Queue
- func (q *Queue) HandleFunc(f HandlerFunc, options ...Option) *Queue
- func (q *Queue) HandleRequest(ctx context.Context, r *Request) error
- func (q Queue) MarshalLogObject(enc logging.ObjectEncoder) error
- func (q Queue) Name() string
- func (q *Queue) NewTask(payload interface{}, options ...Option) *Task
- func (q *Queue) OnComplete(sub Handler) *Queue
- func (q *Queue) OnCompleteFunc(f HandlerFunc) *Queue
- func (q *Queue) OnFailure(sub Handler) *Queue
- func (q *Queue) OnFailureFunc(f HandlerFunc) *Queue
- func (q *Queue) OnStart(sub Handler) *Queue
- func (q *Queue) OnStartFunc(f HandlerFunc) *Queue
- func (q *Queue) OnSuccess(sub Handler) *Queue
- func (q *Queue) OnSuccessFunc(f HandlerFunc) *Queue
- func (q *Queue) Publish(ctx context.Context, payload interface{}, options ...Option) (*Task, error)
- func (q *Queue) PublishTask(ctx context.Context, task *Task) error
- func (q *Queue) Save(ctx context.Context, task *Task) error
- func (q Queue) TaskKey(taskID string) string
- func (q *Queue) Use(sub ...func(Handler) Handler) *Queue
- type QueueConfig
- type QueueStats
- type RedisClientConfig
- type RedisClusterConfig
- type RedisConfig
- type RedisSentinelConfig
- type Request
- type Serializer
- type SerializerConfig
- type Task
- func (t Task) ETADisplay() string
- func (t *Task) Finished() bool
- func (t *Task) IsStatusCanceled() bool
- func (t *Task) IsStatusFailed() bool
- func (t *Task) IsStatusProcessing() bool
- func (t *Task) IsStatusSucceeded() bool
- func (t *Task) IsStatusWaiting() bool
- func (t Task) Key() string
- func (t *Task) MarkAsCanceled()
- func (t *Task) MarkAsFailed(err error)
- func (t *Task) MarkAsProcessing()
- func (t *Task) MarkAsSucceeded()
- func (t Task) MarshalLogObject(enc logging.ObjectEncoder) error
- func (t Task) RetryETA() time.Time
- func (t Task) RetryIntervalsDisplay() string
- func (t Task) Serialize(serializer Serializer) (map[string]interface{}, error)
- func (t Task) StatusDisplay() string
- func (t Task) String() string
- type Tracer
Constants ¶
const (
Version = "v0.1.0"
)
Variables ¶
var ( // ErrorCtxKey is the context.Context key to store // the recovered error from the middleware ErrorCtxKey = &contextKey{"Error"} // TaskCtxKey is the context.Context key to store // the task from the middleware TaskCtxKey = &contextKey{"Task"} // AfterRequestCtxKey is the context.Context to store // functions to execute after the request AfterRequestCtxKey = &contextKey{"AfterRequest"} )
var ( // ErrAttributeError is returned when an attribute is not found. ErrAttributeError = fmt.Errorf("Attribute error") // ErrTaskCanceled is returned when a task is canceled. ErrTaskCanceled = fmt.Errorf("Task canceled") // ErrTaskNotFound is returned when a task is not found. ErrTaskNotFound = fmt.Errorf("Task not found") // ErrNoQueueToRun is returned when no queue has been found to run. ErrNoQueueToRun = fmt.Errorf("No queue to run") )
var ( // Normal colors ColorBlack = []byte{'\033', '[', '3', '0', 'm'} ColorRed = []byte{'\033', '[', '3', '1', 'm'} ColorGreen = []byte{'\033', '[', '3', '2', 'm'} ColorYellow = []byte{'\033', '[', '3', '3', 'm'} ColorBlue = []byte{'\033', '[', '3', '4', 'm'} ColorMagenta = []byte{'\033', '[', '3', '5', 'm'} ColorCyan = []byte{'\033', '[', '3', '6', 'm'} ColorWhite = []byte{'\033', '[', '3', '7', 'm'} // Bright colors ColorBrightBlack = []byte{'\033', '[', '3', '0', ';', '1', 'm'} ColorBrightRed = []byte{'\033', '[', '3', '1', ';', '1', 'm'} ColorBrightGreen = []byte{'\033', '[', '3', '2', ';', '1', 'm'} ColorBrightYellow = []byte{'\033', '[', '3', '3', ';', '1', 'm'} ColorBrightBlue = []byte{'\033', '[', '3', '4', ';', '1', 'm'} ColorBrightMagenta = []byte{'\033', '[', '3', '5', ';', '1', 'm'} ColorBrightCyan = []byte{'\033', '[', '3', '6', ';', '1', 'm'} ColorBrightWhite = []byte{'\033', '[', '3', '7', ';', '1', 'm'} ColorReset = []byte{'\033', '[', '0', 'm'} )
var DefaultTracer = NewLoggerTracer(logging.DefaultLogger)
DefaultTracer is the default tracer.
Functions ¶
func ColorWrite ¶
colorWrite
func GetContextError ¶
GetContextError returns the in-context recovered error for a request.
func WithContextAfterRequestFunc ¶
func WithContextAfterRequestFunc(ctx context.Context, f AfterRequestFunc) context.Context
WithContextAfterRequestFunc registers a new function to be executed after the request
func WithContextError ¶
WithContextError sets the in-context error for a request.
Types ¶
type AfterRequestFunc ¶
type AfterRequestFunc func()
AfterRequestFunc is a function which will execute after the request
func GetContextAfterRequestFuncs ¶
func GetContextAfterRequestFuncs(ctx context.Context) []AfterRequestFunc
GetContextAfterRequestFuncs returns the registered functions which will execute after the request
type Bokchoy ¶
type Bokchoy struct {
// contains filtered or unexported fields
}
Bokchoy is the main object which stores all configuration, queues and broker.
func (*Bokchoy) HandleFunc ¶
func (b *Bokchoy) HandleFunc(queueName string, f HandlerFunc, options ...Option)
HandleFunc registers a new handler function to consume tasks for a queue.
func (*Bokchoy) Publish ¶
func (b *Bokchoy) Publish(ctx context.Context, queueName string, payload interface{}, options ...Option) (*Task, error)
Publish publishes a new payload to a queue.
func (*Bokchoy) QueueNames ¶
QueueNames returns the managed queue names.
type Broker ¶
type Broker interface { // Initialize initializes the broker. Initialize(context.Context) error // Ping pings the broker to ensure it's well connected. Ping() error // Get returns raw data stored in broker. Get(string) (map[string]interface{}, error) // Empty empties a queue. Empty(string) error // Flush flushes the entire broker. Flush() error // Count returns number of items from a queue name. Count(string) (int, error) // Save synchronizes the stored item. Set(string, map[string]interface{}, time.Duration) error // Publish publishes raw data. Publish(string, string, string, map[string]interface{}, time.Time) error // Consume returns an array of raw data. Consume(string, string, time.Time) ([]map[string]interface{}, error) }
Broker is the common interface to define a Broker.
type BrokerConfig ¶
type BrokerConfig struct { Type string Redis RedisConfig }
BrokerConfig contains the broker configuration.
type Config ¶
type Config struct { Queues []QueueConfig Broker BrokerConfig Serializer SerializerConfig }
Config contains the main configuration to initialize Bokchoy.
type HandlerFunc ¶
HandlerFunc is a handler to handle incoming tasks.
func (HandlerFunc) Handle ¶
func (s HandlerFunc) Handle(r *Request) error
Handle consumes the request.
type JSONSerializer ¶
type JSONSerializer struct { }
func (JSONSerializer) Dumps ¶
func (s JSONSerializer) Dumps(v interface{}) ([]byte, error)
func (JSONSerializer) Loads ¶
func (s JSONSerializer) Loads(data []byte, v interface{}) error
func (JSONSerializer) String ¶
func (s JSONSerializer) String() string
type Option ¶
type Option func(opts *Options)
Option is an option unit.
func WithConcurrency ¶
WithConcurrency defines the number of concurrent consumers.
func WithCountdown ¶
WithCountdown defines the countdown to launch a delayed task.
func WithDisableOutput ¶
WithDisableOutput defines if the output (logo, queues information) should be disabled.
func WithInitialize ¶
WithInitialize defines if the broker needs to be initialized.
func WithMaxRetries ¶
WithMaxRetries defines the number of maximum retries for a failed task.
func WithQueues ¶
WithQueues allows to override queues to run.
func WithRetryIntervals ¶
WithRetryIntervals defines the retry intervals for a failed task.
func WithSerializer ¶
func WithSerializer(serializer Serializer) Option
WithSerializer defines the Serializer.
func WithTimeout ¶
WithTimeout defines the timeout used to execute a task.
type Options ¶
type Options struct { Tracer Tracer Logger logging.Logger Concurrency int MaxRetries int TTL time.Duration Countdown time.Duration Timeout time.Duration RetryIntervals []time.Duration Serializer Serializer Initialize bool Queues []string DisableOutput bool }
Options is the bokchoy options.
func (Options) RetryIntervalsDisplay ¶
RetryIntervalsDisplay returns a string representation of the retry intervals.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue contains consumers to enqueue.
func (*Queue) ConsumeDelayed ¶
ConsumeDelayed returns an array of delayed tasks.
func (*Queue) Count ¶
func (q *Queue) Count(ctx context.Context) (QueueStats, error)
Count returns statistics from queue: * direct: number of waiting tasks * delayed: number of waiting delayed tasks * total: number of total tasks
func (*Queue) HandleFunc ¶
func (q *Queue) HandleFunc(f HandlerFunc, options ...Option) *Queue
HandleFunc registers a new handler function to consume tasks.
func (*Queue) HandleRequest ¶
HandleRequest handles a request synchronously with a consumer.
func (Queue) MarshalLogObject ¶
func (q Queue) MarshalLogObject(enc logging.ObjectEncoder) error
MarshalLogObject returns the log representation for the queue.
func (*Queue) OnComplete ¶
OnComplete registers a new handler to be executed when a task is completed.
func (*Queue) OnCompleteFunc ¶
func (q *Queue) OnCompleteFunc(f HandlerFunc) *Queue
OnCompleteFunc registers a new handler function to be executed when a task is completed.
func (*Queue) OnFailureFunc ¶
func (q *Queue) OnFailureFunc(f HandlerFunc) *Queue
OnFailureFunc registers a new handler function to be executed when a task is failed.
func (*Queue) OnStartFunc ¶
func (q *Queue) OnStartFunc(f HandlerFunc) *Queue
OnStartFunc registers a new handler function to be executed when a task is started.
func (*Queue) OnSuccess ¶
OnSuccess registers a new handler to be executed when a task is succeeded.
func (*Queue) OnSuccessFunc ¶
func (q *Queue) OnSuccessFunc(f HandlerFunc) *Queue
OnSuccessFunc registers a new handler function to be executed when a task is succeeded.
func (*Queue) PublishTask ¶
PublishTask publishes a new task to the queue.
type QueueConfig ¶
type QueueConfig struct {
Name string
}
QueueConfig contains queue information that should be initialized.
type QueueStats ¶
QueueStats is the statistics returned by a Queue.
type RedisClientConfig ¶
RedisClientConfig contains the redis client configuration.
type RedisClusterConfig ¶
type RedisClusterConfig redis.ClusterOptions
RedisClusterConfig contains the redis cluster configuration.
type RedisConfig ¶
type RedisConfig struct { Type string Prefix string Client RedisClientConfig Cluster RedisClusterConfig Sentinel RedisSentinelConfig }
RedisConfig contains all redis configuration: client, sentinel (failover), cluster.
type RedisSentinelConfig ¶
type RedisSentinelConfig redis.FailoverOptions
RedisSentinelConfig contains the redis sentinel configuration.
type Request ¶
type Request struct { Task *Task // contains filtered or unexported fields }
Request is the bokchoy Request which will be handled by a subscriber handler.
type Serializer ¶
Serializer defines an interface to implement a serializer.
type SerializerConfig ¶
type SerializerConfig struct {
Type string
}
SerializerConfig contains a serializer configuration to store tasks.
type Task ¶
type Task struct { ID string Name string PublishedAt time.Time StartedAt time.Time ProcessedAt time.Time Status int OldStatus int MaxRetries int Payload interface{} Result interface{} Error interface{} ExecTime float64 TTL time.Duration Timeout time.Duration ETA time.Time RetryIntervals []time.Duration }
Task is the model stored in a Queue.
func GetContextTask ¶
GetContextTask returns the in-context task for a request.
func TaskFromPayload ¶
func TaskFromPayload(data map[string]interface{}, serializer Serializer) (*Task, error)
TaskFromPayload returns a Task instance from raw data.
func (Task) ETADisplay ¶
ETADisplay returns the string representation of the ETA.
func (*Task) IsStatusCanceled ¶
IsStatusCanceled returns if the task status is canceled.
func (*Task) IsStatusFailed ¶
IsStatusFailed returns if the task status is failed.
func (*Task) IsStatusProcessing ¶
IsStatusProcessing returns if the task status is processing.
func (*Task) IsStatusSucceeded ¶
IsStatusSucceeded returns if the task status is succeeded.
func (*Task) IsStatusWaiting ¶
IsStatusWaiting returns if the task status is waiting.
func (*Task) MarkAsCanceled ¶
func (t *Task) MarkAsCanceled()
MarkAsCanceled marks a task as canceled.
func (*Task) MarkAsFailed ¶
MarkAsFailed marks a task as failed.
func (*Task) MarkAsProcessing ¶
func (t *Task) MarkAsProcessing()
MarkAsProcessing marks a task as processing.
func (*Task) MarkAsSucceeded ¶
func (t *Task) MarkAsSucceeded()
MarkAsSucceeded marks a task as succeeded.
func (Task) MarshalLogObject ¶
func (t Task) MarshalLogObject(enc logging.ObjectEncoder) error
MarshalLogObject returns the log representation for the task.
func (Task) RetryIntervalsDisplay ¶
RetryIntervalsDisplay returns the string representation of the retry intervals.
func (Task) Serialize ¶
func (t Task) Serialize(serializer Serializer) (map[string]interface{}, error)
Serialize serializes a Task to raw data.
func (Task) StatusDisplay ¶
StatusDisplay returns the status in human representation.