Documentation
¶
Index ¶
- Constants
- Variables
- func GetContextError(ctx context.Context) error
- func ID() string
- func WithContextAfterRequestFunc(ctx context.Context, f AfterRequestFunc) context.Context
- func WithContextError(ctx context.Context, err error) context.Context
- func WithContextTask(ctx context.Context, task *Task) context.Context
- type AfterRequestFunc
- type Bokchoy
- func (b *Bokchoy) Empty(ctx context.Context) error
- func (b *Bokchoy) Flush() error
- func (b *Bokchoy) Handle(queueName string, sub Handler, options ...Option)
- func (b *Bokchoy) HandleFunc(queueName string, f HandlerFunc, options ...Option)
- func (b *Bokchoy) Publish(ctx context.Context, queueName string, payload interface{}, options ...Option) (*Task, error)
- func (b *Bokchoy) Queue(name string) *Queue
- func (b *Bokchoy) QueueNames() []string
- func (b *Bokchoy) Run(ctx context.Context, options ...Option) error
- func (b *Bokchoy) ServerNames() []string
- func (b *Bokchoy) Stop(ctx context.Context)
- func (b *Bokchoy) Use(sub ...func(Handler) Handler) *Bokchoy
- type Broker
- type BrokerConfig
- type BrokerStats
- type Color
- type ColorWriter
- type Config
- type Handler
- type HandlerFunc
- type JSONSerializer
- type Option
- func WithBroker(broker Broker) Option
- func WithConcurrency(concurrency int) Option
- func WithCountdown(countdown time.Duration) Option
- func WithDisableOutput(disableOutput bool) Option
- func WithInitialize(initialize bool) Option
- func WithLogger(logger logging.Logger) Option
- func WithMaxRetries(maxRetries int) Option
- func WithQueues(queues []string) Option
- func WithRetryIntervals(retryIntervals []time.Duration) Option
- func WithSerializer(serializer Serializer) Option
- func WithServers(servers []Server) Option
- func WithTTL(ttl time.Duration) Option
- func WithTimeout(timeout time.Duration) Option
- func WithTracer(tracer Tracer) Option
- type Options
- type Queue
- func (q *Queue) Cancel(ctx context.Context, taskID string) (*Task, error)
- func (q *Queue) Consume(ctx context.Context) ([]*Task, error)
- func (q *Queue) Consumer() *consumer
- func (q *Queue) Count(ctx context.Context) (BrokerStats, error)
- func (q *Queue) Empty(ctx context.Context) error
- func (q *Queue) Get(ctx context.Context, taskID string) (*Task, error)
- func (q *Queue) Handle(sub Handler, options ...Option) *Queue
- func (q *Queue) HandleFunc(f HandlerFunc, options ...Option) *Queue
- func (q *Queue) HandleRequest(ctx context.Context, r *Request) error
- func (q *Queue) List(ctx context.Context) ([]*Task, error)
- func (q Queue) MarshalLogObject(enc logging.ObjectEncoder) error
- func (q Queue) Name() string
- func (q *Queue) NewTask(payload interface{}, options ...Option) *Task
- func (q *Queue) OnComplete(sub Handler) *Queue
- func (q *Queue) OnCompleteFunc(f HandlerFunc) *Queue
- func (q *Queue) OnFailure(sub Handler) *Queue
- func (q *Queue) OnFailureFunc(f HandlerFunc) *Queue
- func (q *Queue) OnStart(sub Handler) *Queue
- func (q *Queue) OnStartFunc(f HandlerFunc) *Queue
- func (q *Queue) OnSuccess(sub Handler) *Queue
- func (q *Queue) OnSuccessFunc(f HandlerFunc) *Queue
- func (q *Queue) Publish(ctx context.Context, payload interface{}, options ...Option) (*Task, error)
- func (q *Queue) PublishTask(ctx context.Context, task *Task) error
- func (q *Queue) Save(ctx context.Context, task *Task) error
- func (q *Queue) Use(sub ...func(Handler) Handler) *Queue
- type QueueConfig
- type RedisBroker
- func (p *RedisBroker) Consume(ctx context.Context, name string, eta time.Time) ([]map[string]interface{}, error)
- func (p *RedisBroker) Count(queueName string) (BrokerStats, error)
- func (p *RedisBroker) Delete(name string, taskID string) error
- func (p *RedisBroker) Empty(name string) error
- func (p *RedisBroker) Flush() error
- func (p *RedisBroker) Get(taskKey string) (map[string]interface{}, error)
- func (p *RedisBroker) Initialize(ctx context.Context) error
- func (p *RedisBroker) List(name string) ([]map[string]interface{}, error)
- func (p RedisBroker) Ping() error
- func (p *RedisBroker) Publish(queueName string, taskID string, data map[string]interface{}, eta time.Time) error
- func (p *RedisBroker) Set(taskKey string, data map[string]interface{}, expiration time.Duration) error
- func (p RedisBroker) String() string
- type RedisClientConfig
- type RedisClusterConfig
- type RedisConfig
- type RedisSentinelConfig
- type Request
- type Serializer
- type SerializerConfig
- type Server
- type Task
- func (t Task) ETADisplay() string
- func (t *Task) Finished() bool
- func (t *Task) IsStatusCanceled() bool
- func (t *Task) IsStatusFailed() bool
- func (t *Task) IsStatusProcessing() bool
- func (t *Task) IsStatusSucceeded() bool
- func (t *Task) IsStatusWaiting() bool
- func (t Task) Key() string
- func (t *Task) MarkAsCanceled()
- func (t *Task) MarkAsFailed(err error)
- func (t *Task) MarkAsProcessing()
- func (t *Task) MarkAsSucceeded()
- func (t Task) MarshalLogObject(enc logging.ObjectEncoder) error
- func (t Task) RetryETA() time.Time
- func (t Task) RetryIntervalsDisplay() string
- func (t Task) Serialize(serializer Serializer) (map[string]interface{}, error)
- func (t Task) StatusDisplay() string
- func (t Task) String() string
- type Tracer
Constants ¶
const (
Version = "v0.2.0"
)
Variables ¶
var ( // ErrorCtxKey is the context.Context key to store // the recovered error from the middleware ErrorCtxKey = &contextKey{"Error"} // TaskCtxKey is the context.Context key to store // the task from the middleware TaskCtxKey = &contextKey{"Task"} // AfterRequestCtxKey is the context.Context to store // functions to execute after the request AfterRequestCtxKey = &contextKey{"AfterRequest"} )
var ( // ErrAttributeError is returned when an attribute is not found. ErrAttributeError = fmt.Errorf("Attribute error") // ErrTaskCanceled is returned when a task is canceled. ErrTaskCanceled = fmt.Errorf("Task canceled") // ErrTaskNotFound is returned when a task is not found. ErrTaskNotFound = fmt.Errorf("Task not found") // ErrNoQueueToRun is returned when no queue has been found to run. ErrNoQueueToRun = fmt.Errorf("No queue to run") )
var ( // Normal colors ColorBlack = Color{'\033', '[', '3', '0', 'm'} ColorRed = Color{'\033', '[', '3', '1', 'm'} ColorGreen = Color{'\033', '[', '3', '2', 'm'} ColorYellow = Color{'\033', '[', '3', '3', 'm'} ColorBlue = Color{'\033', '[', '3', '4', 'm'} ColorMagenta = Color{'\033', '[', '3', '5', 'm'} ColorCyan = Color{'\033', '[', '3', '6', 'm'} ColorWhite = Color{'\033', '[', '3', '7', 'm'} // Bright colors ColorBrightBlack = Color{'\033', '[', '3', '0', ';', '1', 'm'} ColorBrightRed = Color{'\033', '[', '3', '1', ';', '1', 'm'} ColorBrightGreen = Color{'\033', '[', '3', '2', ';', '1', 'm'} ColorBrightYellow = Color{'\033', '[', '3', '3', ';', '1', 'm'} ColorBrightBlue = Color{'\033', '[', '3', '4', ';', '1', 'm'} ColorBrightMagenta = Color{'\033', '[', '3', '5', ';', '1', 'm'} ColorBrightCyan = Color{'\033', '[', '3', '6', ';', '1', 'm'} ColorBrightWhite = Color{'\033', '[', '3', '7', ';', '1', 'm'} ColorReset = Color{'\033', '[', '0', 'm'} )
var DefaultTracer = NewLoggerTracer(logging.DefaultLogger)
DefaultTracer is the default tracer.
Functions ¶
func GetContextError ¶
GetContextError returns the in-context recovered error for a request.
func WithContextAfterRequestFunc ¶
func WithContextAfterRequestFunc(ctx context.Context, f AfterRequestFunc) context.Context
WithContextAfterRequestFunc registers a new function to be executed after the request
func WithContextError ¶
WithContextError sets the in-context error for a request.
Types ¶
type AfterRequestFunc ¶
type AfterRequestFunc func()
AfterRequestFunc is a function which will execute after the request
func GetContextAfterRequestFuncs ¶
func GetContextAfterRequestFuncs(ctx context.Context) []AfterRequestFunc
GetContextAfterRequestFuncs returns the registered functions which will execute after the request
type Bokchoy ¶
type Bokchoy struct { Serializer Serializer Logger logging.Logger Tracer Tracer // contains filtered or unexported fields }
Bokchoy is the main object which stores all configuration, queues and broker.
func (*Bokchoy) HandleFunc ¶
func (b *Bokchoy) HandleFunc(queueName string, f HandlerFunc, options ...Option)
HandleFunc registers a new handler function to consume tasks for a queue.
func (*Bokchoy) Publish ¶
func (b *Bokchoy) Publish(ctx context.Context, queueName string, payload interface{}, options ...Option) (*Task, error)
Publish publishes a new payload to a queue.
func (*Bokchoy) QueueNames ¶
QueueNames returns the managed queue names.
func (*Bokchoy) ServerNames ¶
ServerNames returns the managed server names.
type Broker ¶
type Broker interface { // Initialize initializes the broker. Initialize(context.Context) error // Ping pings the broker to ensure it's well connected. Ping() error // Get returns raw data stored in broker. Get(string) (map[string]interface{}, error) // Delete deletes raw data in broker based on key. Delete(string, string) error // List returns raw data stored in broker. List(string) ([]map[string]interface{}, error) // Empty empties a queue. Empty(string) error // Flush flushes the entire broker. Flush() error // Count returns number of items from a queue name. Count(string) (BrokerStats, error) // Save synchronizes the stored item. Set(string, map[string]interface{}, time.Duration) error // Publish publishes raw data. Publish(string, string, map[string]interface{}, time.Time) error // Consume returns an array of raw data. Consume(context.Context, string, time.Time) ([]map[string]interface{}, error) }
Broker is the common interface to define a Broker.
type BrokerConfig ¶
type BrokerConfig struct { Type string Redis RedisConfig }
BrokerConfig contains the broker configuration.
type BrokerStats ¶
BrokerStats is the statistics returned by a Queue.
type ColorWriter ¶
ColorWriter is a bytes buffer with color.
func NewColorWriter ¶
func NewColorWriter(color Color) *ColorWriter
NewColorWriter initializes a new ColorWriter.
func (ColorWriter) WithColor ¶
func (c ColorWriter) WithColor(color Color) *ColorWriter
WithColor returns a new ColorWriter with a new color.
func (*ColorWriter) Write ¶
func (c *ColorWriter) Write(s string, args ...interface{})
Write writes an output to stdout. nolint: errcheck
type Config ¶
type Config struct { Queues []QueueConfig Broker BrokerConfig Serializer SerializerConfig }
Config contains the main configuration to initialize Bokchoy.
type HandlerFunc ¶
HandlerFunc is a handler to handle incoming tasks.
func (HandlerFunc) Handle ¶
func (s HandlerFunc) Handle(r *Request) error
Handle consumes the request.
type JSONSerializer ¶
type JSONSerializer struct { }
func (JSONSerializer) Dumps ¶
func (s JSONSerializer) Dumps(v interface{}) ([]byte, error)
func (JSONSerializer) Loads ¶
func (s JSONSerializer) Loads(data []byte, v interface{}) error
func (JSONSerializer) String ¶
func (s JSONSerializer) String() string
type Option ¶
type Option func(opts *Options)
Option is an option unit.
func WithConcurrency ¶
WithConcurrency defines the number of concurrent consumers.
func WithCountdown ¶
WithCountdown defines the countdown to launch a delayed task.
func WithDisableOutput ¶
WithDisableOutput defines if the output (logo, queues information) should be disabled.
func WithInitialize ¶
WithInitialize defines if the broker needs to be initialized.
func WithMaxRetries ¶
WithMaxRetries defines the number of maximum retries for a failed task.
func WithQueues ¶
WithQueues allows to override queues to run.
func WithRetryIntervals ¶
WithRetryIntervals defines the retry intervals for a failed task.
func WithSerializer ¶
func WithSerializer(serializer Serializer) Option
WithSerializer defines the Serializer.
func WithServers ¶
WithServers registers new servers to be run.
func WithTimeout ¶
WithTimeout defines the timeout used to execute a task.
type Options ¶
type Options struct { Tracer Tracer Logger logging.Logger Concurrency int MaxRetries int TTL time.Duration Countdown *time.Duration Timeout time.Duration RetryIntervals []time.Duration Serializer Serializer Initialize bool Queues []string DisableOutput bool Servers []Server Broker Broker }
Options is the bokchoy options.
func (Options) RetryIntervalsDisplay ¶
RetryIntervalsDisplay returns a string representation of the retry intervals.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue contains consumers to enqueue.
func (*Queue) Count ¶
func (q *Queue) Count(ctx context.Context) (BrokerStats, error)
Count returns statistics from queue: * direct: number of waiting tasks * delayed: number of waiting delayed tasks * total: number of total tasks
func (*Queue) HandleFunc ¶
func (q *Queue) HandleFunc(f HandlerFunc, options ...Option) *Queue
HandleFunc registers a new handler function to consume tasks.
func (*Queue) HandleRequest ¶
HandleRequest handles a request synchronously with a consumer.
func (Queue) MarshalLogObject ¶
func (q Queue) MarshalLogObject(enc logging.ObjectEncoder) error
MarshalLogObject returns the log representation for the queue.
func (*Queue) OnComplete ¶
OnComplete registers a new handler to be executed when a task is completed.
func (*Queue) OnCompleteFunc ¶
func (q *Queue) OnCompleteFunc(f HandlerFunc) *Queue
OnCompleteFunc registers a new handler function to be executed when a task is completed.
func (*Queue) OnFailureFunc ¶
func (q *Queue) OnFailureFunc(f HandlerFunc) *Queue
OnFailureFunc registers a new handler function to be executed when a task is failed.
func (*Queue) OnStartFunc ¶
func (q *Queue) OnStartFunc(f HandlerFunc) *Queue
OnStartFunc registers a new handler function to be executed when a task is started.
func (*Queue) OnSuccess ¶
OnSuccess registers a new handler to be executed when a task is succeeded.
func (*Queue) OnSuccessFunc ¶
func (q *Queue) OnSuccessFunc(f HandlerFunc) *Queue
OnSuccessFunc registers a new handler function to be executed when a task is succeeded.
func (*Queue) PublishTask ¶
PublishTask publishes a new task to the queue.
type QueueConfig ¶
type QueueConfig struct {
Name string
}
QueueConfig contains queue information that should be initialized.
type RedisBroker ¶
type RedisBroker struct { ClientType string Client redisClient Prefix string Logger logging.Logger // contains filtered or unexported fields }
RedisBroker is the redis broker.
func NewRedisBroker ¶
func NewRedisBroker(clt redisClient, clientType string, prefix string, logger logging.Logger) *RedisBroker
NewRedisBroker initializes a new redis broker instance.
func (*RedisBroker) Consume ¶
func (p *RedisBroker) Consume(ctx context.Context, name string, eta time.Time) ([]map[string]interface{}, error)
Consume returns an array of raw data.
func (*RedisBroker) Count ¶
func (p *RedisBroker) Count(queueName string) (BrokerStats, error)
Count returns number of items from a queue name.
func (*RedisBroker) Delete ¶
func (p *RedisBroker) Delete(name string, taskID string) error
Delete deletes raw data in broker based on key.
func (*RedisBroker) Empty ¶
func (p *RedisBroker) Empty(name string) error
Empty removes the redis key for a queue.
func (*RedisBroker) Flush ¶
func (p *RedisBroker) Flush() error
Flush flushes the entire redis database.
func (*RedisBroker) Get ¶
func (p *RedisBroker) Get(taskKey string) (map[string]interface{}, error)
Get returns stored raw data from task key.
func (*RedisBroker) Initialize ¶
func (p *RedisBroker) Initialize(ctx context.Context) error
Initialize initializes the redis broker.
func (*RedisBroker) List ¶
func (p *RedisBroker) List(name string) ([]map[string]interface{}, error)
func (RedisBroker) Ping ¶
func (p RedisBroker) Ping() error
Ping pings the redis broker to ensure it's well connected.
func (*RedisBroker) Publish ¶
func (p *RedisBroker) Publish(queueName string, taskID string, data map[string]interface{}, eta time.Time) error
Publish publishes raw data. it uses a hash to store the task itself pushes the task id to the list or a zset if the task is delayed.
func (*RedisBroker) Set ¶
func (p *RedisBroker) Set(taskKey string, data map[string]interface{}, expiration time.Duration) error
Save synchronizes the stored item in redis.
func (RedisBroker) String ¶
func (p RedisBroker) String() string
type RedisClientConfig ¶
RedisClientConfig contains the redis client configuration.
type RedisClusterConfig ¶
type RedisClusterConfig redis.ClusterOptions
RedisClusterConfig contains the redis cluster configuration.
type RedisConfig ¶
type RedisConfig struct { Type string Prefix string Client RedisClientConfig Cluster RedisClusterConfig Sentinel RedisSentinelConfig }
RedisConfig contains all redis configuration: client, sentinel (failover), cluster.
type RedisSentinelConfig ¶
type RedisSentinelConfig redis.FailoverOptions
RedisSentinelConfig contains the redis sentinel configuration.
type Request ¶
type Request struct { Task *Task // contains filtered or unexported fields }
Request is the bokchoy Request which will be handled by a subscriber handler.
type Serializer ¶
Serializer defines an interface to implement a serializer.
type SerializerConfig ¶
type SerializerConfig struct {
Type string
}
SerializerConfig contains a serializer configuration to store tasks.
type Task ¶
type Task struct { ID string Name string PublishedAt time.Time StartedAt time.Time ProcessedAt time.Time Status int OldStatus int MaxRetries int Payload interface{} Result interface{} Error interface{} ExecTime float64 TTL time.Duration Timeout time.Duration ETA time.Time RetryIntervals []time.Duration }
Task is the model stored in a Queue.
func GetContextTask ¶
GetContextTask returns the in-context task for a request.
func TaskFromPayload ¶
func TaskFromPayload(data map[string]interface{}, serializer Serializer) (*Task, error)
TaskFromPayload returns a Task instance from raw data.
func (Task) ETADisplay ¶
ETADisplay returns the string representation of the ETA.
func (*Task) IsStatusCanceled ¶
IsStatusCanceled returns if the task status is canceled.
func (*Task) IsStatusFailed ¶
IsStatusFailed returns if the task status is failed.
func (*Task) IsStatusProcessing ¶
IsStatusProcessing returns if the task status is processing.
func (*Task) IsStatusSucceeded ¶
IsStatusSucceeded returns if the task status is succeeded.
func (*Task) IsStatusWaiting ¶
IsStatusWaiting returns if the task status is waiting.
func (*Task) MarkAsCanceled ¶
func (t *Task) MarkAsCanceled()
MarkAsCanceled marks a task as canceled.
func (*Task) MarkAsFailed ¶
MarkAsFailed marks a task as failed.
func (*Task) MarkAsProcessing ¶
func (t *Task) MarkAsProcessing()
MarkAsProcessing marks a task as processing.
func (*Task) MarkAsSucceeded ¶
func (t *Task) MarkAsSucceeded()
MarkAsSucceeded marks a task as succeeded.
func (Task) MarshalLogObject ¶
func (t Task) MarshalLogObject(enc logging.ObjectEncoder) error
MarshalLogObject returns the log representation for the task.
func (Task) RetryIntervalsDisplay ¶
RetryIntervalsDisplay returns the string representation of the retry intervals.
func (Task) Serialize ¶
func (t Task) Serialize(serializer Serializer) (map[string]interface{}, error)
Serialize serializes a Task to raw data.
func (Task) StatusDisplay ¶
StatusDisplay returns the status in human representation.