Versions in this module Expand all Collapse all v0 v0.1.0 Jul 14, 2019 Changes in this version + const Version + var AfterRequestCtxKey = &contextKey + var ColorBlack = []byte + var ColorBlue = []byte + var ColorBrightBlack = []byte + var ColorBrightBlue = []byte + var ColorBrightCyan = []byte + var ColorBrightGreen = []byte + var ColorBrightMagenta = []byte + var ColorBrightRed = []byte + var ColorBrightWhite = []byte + var ColorBrightYellow = []byte + var ColorCyan = []byte + var ColorGreen = []byte + var ColorMagenta = []byte + var ColorRed = []byte + var ColorReset = []byte + var ColorWhite = []byte + var ColorYellow = []byte + var DefaultTracer = NewLoggerTracer(logging.DefaultLogger) + var ErrAttributeError = fmt.Errorf("Attribute error") + var ErrNoQueueToRun = fmt.Errorf("No queue to run") + var ErrTaskCanceled = fmt.Errorf("Task canceled") + var ErrTaskNotFound = fmt.Errorf("Task not found") + var ErrorCtxKey = &contextKey + var TaskCtxKey = &contextKey + func ColorWrite(w io.Writer, useColor bool, color []byte, s string, args ...interface{}) + func GetContextError(ctx context.Context) error + func WithContextAfterRequestFunc(ctx context.Context, f AfterRequestFunc) context.Context + func WithContextError(ctx context.Context, err error) context.Context + func WithContextTask(ctx context.Context, task *Task) context.Context + type AfterRequestFunc func() + func GetContextAfterRequestFuncs(ctx context.Context) []AfterRequestFunc + type Bokchoy struct + func New(ctx context.Context, cfg Config, options ...Option) (*Bokchoy, error) + func (b *Bokchoy) Empty(ctx context.Context) error + func (b *Bokchoy) Flush() error + func (b *Bokchoy) Handle(queueName string, sub Handler, options ...Option) + func (b *Bokchoy) HandleFunc(queueName string, f HandlerFunc, options ...Option) + func (b *Bokchoy) Publish(ctx context.Context, queueName string, payload interface{}, options ...Option) (*Task, error) + func (b *Bokchoy) Queue(name string) *Queue + func (b *Bokchoy) QueueNames() []string + func (b *Bokchoy) Run(ctx context.Context, options ...Option) error + func (b *Bokchoy) Stop(ctx context.Context) + func (b *Bokchoy) Use(sub ...func(Handler) Handler) *Bokchoy + type Broker interface + Consume func(string, string, time.Time) ([]map[string]interface{}, error) + Count func(string) (int, error) + Empty func(string) error + Flush func() error + Get func(string) (map[string]interface{}, error) + Initialize func(context.Context) error + Ping func() error + Publish func(string, string, string, map[string]interface{}, time.Time) error + Set func(string, map[string]interface{}, time.Duration) error + type BrokerConfig struct + Redis RedisConfig + Type string + type Config struct + Broker BrokerConfig + Queues []QueueConfig + Serializer SerializerConfig + type Handler interface + Handle func(*Request) error + type HandlerFunc func(*Request) error + func (s HandlerFunc) Handle(r *Request) error + type JSONSerializer struct + func (s JSONSerializer) Dumps(v interface{}) ([]byte, error) + func (s JSONSerializer) Loads(data []byte, v interface{}) error + func (s JSONSerializer) String() string + type Option func(opts *Options) + func WithConcurrency(concurrency int) Option + func WithCountdown(countdown time.Duration) Option + func WithDisableOutput(disableOutput bool) Option + func WithInitialize(initialize bool) Option + func WithLogger(logger logging.Logger) Option + func WithMaxRetries(maxRetries int) Option + func WithQueues(queues []string) Option + func WithRetryIntervals(retryIntervals []time.Duration) Option + func WithSerializer(serializer Serializer) Option + func WithTTL(ttl time.Duration) Option + func WithTimeout(timeout time.Duration) Option + func WithTracer(tracer Tracer) Option + type Options struct + Concurrency int + Countdown time.Duration + DisableOutput bool + Initialize bool + Logger logging.Logger + MaxRetries int + Queues []string + RetryIntervals []time.Duration + Serializer Serializer + TTL time.Duration + Timeout time.Duration + Tracer Tracer + func (o Options) RetryIntervalsDisplay() string + type Queue struct + func (q *Queue) Cancel(ctx context.Context, taskID string) (*Task, error) + func (q *Queue) Consume(ctx context.Context) ([]*Task, error) + func (q *Queue) ConsumeDelayed(ctx context.Context) ([]*Task, error) + func (q *Queue) Consumer() *consumer + func (q *Queue) Count(ctx context.Context) (QueueStats, error) + func (q *Queue) Empty(ctx context.Context) error + func (q *Queue) Get(ctx context.Context, taskID string) (*Task, error) + func (q *Queue) Handle(sub Handler, options ...Option) *Queue + func (q *Queue) HandleFunc(f HandlerFunc, options ...Option) *Queue + func (q *Queue) HandleRequest(ctx context.Context, r *Request) error + func (q *Queue) NewTask(payload interface{}, options ...Option) *Task + func (q *Queue) OnComplete(sub Handler) *Queue + func (q *Queue) OnCompleteFunc(f HandlerFunc) *Queue + func (q *Queue) OnFailure(sub Handler) *Queue + func (q *Queue) OnFailureFunc(f HandlerFunc) *Queue + func (q *Queue) OnStart(sub Handler) *Queue + func (q *Queue) OnStartFunc(f HandlerFunc) *Queue + func (q *Queue) OnSuccess(sub Handler) *Queue + func (q *Queue) OnSuccessFunc(f HandlerFunc) *Queue + func (q *Queue) Publish(ctx context.Context, payload interface{}, options ...Option) (*Task, error) + func (q *Queue) PublishTask(ctx context.Context, task *Task) error + func (q *Queue) Save(ctx context.Context, task *Task) error + func (q *Queue) Use(sub ...func(Handler) Handler) *Queue + func (q Queue) DelayName() string + func (q Queue) MarshalLogObject(enc logging.ObjectEncoder) error + func (q Queue) Name() string + func (q Queue) TaskKey(taskID string) string + type QueueConfig struct + Name string + type QueueStats struct + Delayed int + Direct int + Total int + type RedisClientConfig redis.Options + type RedisClusterConfig redis.ClusterOptions + type RedisConfig struct + Client RedisClientConfig + Cluster RedisClusterConfig + Prefix string + Sentinel RedisSentinelConfig + Type string + type RedisSentinelConfig redis.FailoverOptions + type Request struct + Task *Task + func (r *Request) Context() context.Context + func (r *Request) WithContext(ctx context.Context) *Request + func (r Request) String() string + type Serializer interface + Dumps func(interface{}) ([]byte, error) + Loads func([]byte, interface{}) error + type SerializerConfig struct + Type string + type Task struct + ETA time.Time + Error interface{} + ExecTime float64 + ID string + MaxRetries int + Name string + OldStatus int + Payload interface{} + ProcessedAt time.Time + PublishedAt time.Time + Result interface{} + RetryIntervals []time.Duration + StartedAt time.Time + Status int + TTL time.Duration + Timeout time.Duration + func GetContextTask(ctx context.Context) *Task + func NewTask(name string, payload interface{}, options ...Option) *Task + func TaskFromPayload(data map[string]interface{}, serializer Serializer) (*Task, error) + func (t *Task) Finished() bool + func (t *Task) IsStatusCanceled() bool + func (t *Task) IsStatusFailed() bool + func (t *Task) IsStatusProcessing() bool + func (t *Task) IsStatusSucceeded() bool + func (t *Task) IsStatusWaiting() bool + func (t *Task) MarkAsCanceled() + func (t *Task) MarkAsFailed(err error) + func (t *Task) MarkAsProcessing() + func (t *Task) MarkAsSucceeded() + func (t Task) ETADisplay() string + func (t Task) Key() string + func (t Task) MarshalLogObject(enc logging.ObjectEncoder) error + func (t Task) RetryETA() time.Time + func (t Task) RetryIntervalsDisplay() string + func (t Task) Serialize(serializer Serializer) (map[string]interface{}, error) + func (t Task) StatusDisplay() string + func (t Task) String() string + type Tracer interface + Log func(context.Context, string, error) + func NewLoggerTracer(logger logging.Logger) Tracer