Documentation ¶
Overview ¶
Package queue provides a deferrableDecorator queue implementation that supports reflectionJob retires and deferred dispatch.
It is recommended to read documentation on the core package before getting started on the queue package.
Introduction ¶
Queues in go is not as prominent as in some other languages, since go excels at handling concurrency. However, the deferrableDecorator queue can still offer some benefit missing from the native mechanism, say go channels. The queued reflectionJob won't be lost even if the system shutdown. In other words, it means jobs can be retried until success. Plus, it is also possible to queue the execution of a particular reflectionJob until a lengthy period of time. Useful when you need to implement "send email after 30 days" type of Job handler.
Simple Usage ¶
First and foremost we should create a reflectionJob, waiting the queue to dispatch. A reflectionJob can be any struct that implements the Job interface.
type Job interface { Type() string Data() interface{} }
Although the object that implements the reflectionJob interface can be dispatched immediately, it only minimally describes the reflectionJob's property . We can tune the properties with the Adjust helper. For example, we want to run the reflectionJob after 3 minutes with maximum 5 retries:
newJob := queue.Adjust(reflectionJob, queue.Defer(3 * time.Minute), queue.MaxAttempts(5))
Like the Job package, you don't have to use this helper. Manually create a queueable Job by implementing this interface on top of the normal Job interface:
type deferrableDecorator interface { Defer() time.Duration Decorate(s *PersistedJob) }
The PersistentJob passed to the Decorate method contains the tunable configuration such as maximum retries.
No matter how you create a persisted Job, to fire it, send it though a dispatcher. The normal dispatcher in the Jobs package won't work, as a queue implementation is required. Luckily, it is deadly simple to convert a standard dispatcher to a queue.JobDispatcher.
queueableDispatcher := queue.NewQueue(&queue.RedisDriver{}) queueableDispatcher.dispatch(newJob)
As you can see, how the queue persist the Jobs is subject to the underlying driver. The default driver bundled in this package is the redis driver.
Once the persisted Job are stored in the external storage, a goroutine should consume them and pipe the reconstructed Job to the listeners. This is done by calling the Consume method JobFrom queue.JobDispatcher
go dispatcher.Consume(context.Background())
Note if a Job is retryable, it is your responsibility to ensure the idempotency. Also, be aware if a persisted Job have many listeners, the Job is up to retry when any of the listeners fail.
Integrate ¶
The queue package exports configuration in this format:
queue: default: redisName: default parallelism: 3 checkQueueLengthIntervalSecond: 15
While manually constructing the queue.JobDispatcher is absolutely feasible, users can use the bundled dependency provider without breaking a sweat. Using this approach, the life cycle of consumer goroutine will be managed automatically by the core.
var c *core.C c.Provide(otredis.Providers()) // to provide the redis driver c.Provide(queue.Providers())
A module is also bundled, providing the queue command (for reloading and flushing).
c.AddModuleFunc(queue.New)
Sometimes there are valid reasons to use more than one queue. Each dispatcher however is bounded to only one queue. To use multiple queues, multiple dispatchers are required. Inject queue.DispatcherMaker to factory a dispatcher with a specific name.
c.Invoke(function(maker queue.DispatcherMaker) { dispatcher, err := maker.Make("default") // see examples for details })
Event-based Jobs ¶
When an attempt to execute the Job handler failed, two kinds of special eventDispatcher-based Job will be fired. If the failed Job can be retried, "queue.RetryingJob" will be fired. If not, "queue.AbortedJob" will be fired.
Metrics ¶
To gain visibility on how the length of the queue, inject a gauge into the core and alias it to queue.Gauge. The queue length of the all internal queues will be periodically reported to metrics collector (Presumably Prometheus).
c.provideDispatcherFactory(di.Deps{func(appName contract.AppName, env contract.Env) queue.Gauge { return prometheus.NewGaugeFrom( stdprometheus.GaugeOpts{ Namespace: appName.String(), Subsystem: env.String(), Owner: "queue_length", Help: "The gauge of queue length", }, []string{"name", "channel"}, ) }})
Example ¶
package main import ( "context" "fmt" queue "github.com/DoNewsCode/core-queue" ) func main() { dispatcher := &queue.SyncDispatcher{} // Subscribe to int Job. dispatcher.Subscribe(queue.Listen(queue.JobFrom(0), func(ctx context.Context, Job queue.Job) error { fmt.Println(Job.Data()) return nil })) // Subscribe to string Job. dispatcher.Subscribe(queue.Listen(queue.JobFrom(""), func(ctx context.Context, Job queue.Job) error { fmt.Println(Job.Data()) return nil })) dispatcher.Dispatch(context.Background(), queue.JobFrom(100)) dispatcher.Dispatch(context.Background(), queue.JobFrom("Job")) }
Output: 100 Job
Example (Defer) ¶
package main import ( "context" "fmt" "time" queue "github.com/DoNewsCode/core-queue" "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/otredis" "github.com/knadh/koanf/parsers/json" "github.com/knadh/koanf/providers/rawbytes" "github.com/oklog/run" ) type DeferMockData struct { Value string } type DeferMockListener struct{} func (m DeferMockListener) Listen() queue.Job { return queue.JobFrom(DeferMockData{}) } func (m DeferMockListener) Process(_ context.Context, Job queue.Job) error { fmt.Println(Job.Data().(DeferMockData).Value) return nil } // bootstrapMetrics is normally done when bootstrapping the framework. We mimic it here for demonstration. func bootstrapDefer() *core.C { const sampleConfig = "{\"log\":{\"level\":\"error\"},\"queue\":{\"default\":{\"parallelism\":1}}}" // Make sure redis is running at localhost:6379 c := core.New( core.WithConfigStack(rawbytes.Provider([]byte(sampleConfig)), json.Parser()), ) // Add ConfProvider c.ProvideEssentials() c.Provide(otredis.Providers()) c.Provide(queue.Providers()) return c } // serveMetrics normally lives at serveMetrics command. We mimic it here for demonstration. func serveDefer(c *core.C, duration time.Duration) { var g run.Group c.ApplyRunGroup(&g) // cancel the run group after some time, so that the program ends. In real project, this is not necessary. ctx, cancel := context.WithTimeout(context.Background(), duration) defer cancel() g.Add(func() error { <-ctx.Done() return nil }, func(err error) { cancel() }) err := g.Run() if err != nil { panic(err) } } func main() { c := bootstrapDefer() c.Invoke(func(dispatcher *queue.Queue) { // Subscribe dispatcher.Subscribe(DeferMockListener{}) // Trigger an Job evt := queue.JobFrom(DeferMockData{Value: "hello world"}) _ = dispatcher.Dispatch(context.Background(), queue.Adjust(evt, queue.Defer(time.Second))) _ = dispatcher.Dispatch(context.Background(), queue.Adjust(evt, queue.Defer(time.Hour))) }) serveDefer(c, 2*time.Second) }
Output: hello world
Example (Factory) ¶
package main import ( "context" "fmt" "time" queue "github.com/DoNewsCode/core-queue" "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/otredis" "github.com/knadh/koanf/parsers/json" "github.com/knadh/koanf/providers/rawbytes" "github.com/oklog/run" ) type MockFactoryData struct { Value string } type MockFactoryListener struct{} func (m MockFactoryListener) Listen() queue.Job { return queue.JobFrom(MockFactoryData{}) } func (m MockFactoryListener) Process(_ context.Context, Job queue.Job) error { fmt.Println(Job.Data().(MockFactoryData).Value) return nil } // bootstrapMetrics is normally done when bootstrapping the framework. We mimic it here for demonstration. func bootstrapFactories() *core.C { const sampleConfig = "{\"log\":{\"level\":\"error\"},\"queue\":{\"default\":{\"parallelism\":2},\"myQueue\":{\"parallelism\":1}}}" // Make sure redis is running at localhost:6379 c := core.New( core.WithConfigStack(rawbytes.Provider([]byte(sampleConfig)), json.Parser()), ) // Add ConfProvider c.ProvideEssentials() c.Provide(otredis.Providers()) c.Provide(queue.Providers()) return c } // serveMetrics normally lives at serve command. We mimic it here for demonstration. func serveFactories(c *core.C, duration time.Duration) { var g run.Group c.ApplyRunGroup(&g) // cancel the run group after some time, so that the program ends. In real project, this is not necessary. ctx, cancel := context.WithTimeout(context.Background(), duration) defer cancel() g.Add(func() error { <-ctx.Done() return nil }, func(err error) { cancel() }) err := g.Run() if err != nil { panic(err) } } func main() { c := bootstrapFactories() c.Invoke(func(maker queue.DispatcherMaker) { dispatcher, err := maker.Make("myQueue") if err != nil { panic(err) } // Subscribe dispatcher.Subscribe(MockFactoryListener{}) // Trigger an Job evt := queue.JobFrom(MockFactoryData{Value: "hello world"}) _ = dispatcher.Dispatch(context.Background(), queue.Adjust(evt)) }) serveFactories(c, 1*time.Second) }
Output: hello world
Example (Faulty) ¶
package main import ( "context" "errors" "fmt" "time" queue "github.com/DoNewsCode/core-queue" "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/otredis" "github.com/knadh/koanf/parsers/json" "github.com/knadh/koanf/providers/rawbytes" "github.com/oklog/run" ) type FaultyMockData struct { Value string } type FaultyMockListener struct { count int } func (m *FaultyMockListener) Listen() queue.Job { return queue.JobFrom(FaultyMockData{}) } func (m *FaultyMockListener) Process(_ context.Context, Job queue.Job) error { if m.count < 2 { fmt.Println("faulty") m.count++ return errors.New("faulty") } fmt.Println(Job.Data().(FaultyMockData).Value) return nil } // bootstrapMetrics is normally done when bootstrapping the framework. We mimic it here for demonstration. func bootstrapRetry() *core.C { const sampleConfig = `{"log":{"level":"error"},"queue":{"default":{"parallelism":1}}}` // Make sure redis is running at localhost:6379 c := core.New( core.WithConfigStack(rawbytes.Provider([]byte(sampleConfig)), json.Parser()), ) // Add ConfProvider c.ProvideEssentials() c.Provide(otredis.Providers()) c.Provide(queue.Providers()) return c } // serveMetrics normally lives at serveMetrics command. We mimic it here for demonstration. func serveRetry(c *core.C, duration time.Duration) { var g run.Group c.ApplyRunGroup(&g) // cancel the run group after some time, so that the program ends. In real project, this is not necessary. ctx, cancel := context.WithTimeout(context.Background(), duration) defer cancel() g.Add(func() error { <-ctx.Done() return nil }, func(err error) { cancel() }) err := g.Run() if err != nil { panic(err) } } func main() { c := bootstrapRetry() c.Invoke(func(dispatcher *queue.Queue) { // Subscribe dispatcher.Subscribe(&FaultyMockListener{}) // Trigger an Job evt := queue.JobFrom(FaultyMockData{Value: "hello world"}) _ = dispatcher.Dispatch(context.Background(), queue.Adjust(evt, queue.MaxAttempts(3))) }) serveRetry(c, 10*time.Second) // retries are made after a random backoff. It may take longer. }
Output: faulty faulty hello world
Example (Metrics) ¶
package main import ( "context" "fmt" "time" queue "github.com/DoNewsCode/core-queue" "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/otredis" "github.com/go-kit/kit/metrics/prometheus" "github.com/knadh/koanf/parsers/json" "github.com/knadh/koanf/providers/rawbytes" "github.com/oklog/run" stdprometheus "github.com/prometheus/client_golang/prometheus" ) type MockMetricsData struct { Value string } type MockMetricsListener struct{} func (m MockMetricsListener) Listen() queue.Job { return queue.JobFrom(MockMetricsData{}) } func (m MockMetricsListener) Process(_ context.Context, Job queue.Job) error { fmt.Println(Job.Data().(MockMetricsData).Value) return nil } // bootstrapMetrics is normally done when bootstrapping the framework. We mimic it here for demonstration. func bootstrapMetrics() *core.C { const sampleConfig = "{\"log\":{\"level\":\"error\"},\"queue\":{\"default\":{\"parallelism\":1}}}" // Make sure redis is running at localhost:6379 c := core.New( core.WithConfigStack(rawbytes.Provider([]byte(sampleConfig)), json.Parser()), ) // Add ConfProvider c.ProvideEssentials() c.Provide(otredis.Providers()) c.Provide(queue.Providers()) c.Provide(di.Deps{func(appName contract.AppName, env contract.Env) queue.Gauge { return prometheus.NewGaugeFrom( stdprometheus.GaugeOpts{ Namespace: appName.String(), Subsystem: env.String(), Name: "queue_length", Help: "The gauge JobFrom queue length", }, []string{"name", "channel"}, ) }}) return c } // serveMetrics normally lives at serveMetrics command. We mimic it here for demonstration. func serveMetrics(c *core.C, duration time.Duration) { var g run.Group c.ApplyRunGroup(&g) // cancel the run group after some time, so that the program ends. In real project, this is not necessary. ctx, cancel := context.WithTimeout(context.Background(), duration) defer cancel() g.Add(func() error { <-ctx.Done() return nil }, func(err error) { cancel() }) err := g.Run() if err != nil { panic(err) } } func main() { c := bootstrapMetrics() c.Invoke(func(dispatcher *queue.Queue) { // Subscribe dispatcher.Subscribe(MockMetricsListener{}) // Trigger an Job evt := queue.JobFrom(MockMetricsData{Value: "hello world"}) _ = dispatcher.Dispatch(context.Background(), queue.Adjust(evt)) }) serveMetrics(c, time.Second) }
Output: hello world
Example (Minimum) ¶
package main import ( "context" "fmt" "time" queue "github.com/DoNewsCode/core-queue" ) type ExampleJob string func (e ExampleJob) Type() string { return "example" } func (e ExampleJob) Data() interface{} { return e } type ExampleListener struct { ch chan struct{} } func (e *ExampleListener) Listen() queue.Job { return ExampleJob("") } func (e *ExampleListener) Process(ctx context.Context, job queue.Job) error { fmt.Println(job.Data()) e.ch <- struct{}{} return nil } func main() { queueDispatcher := queue.NewQueue(queue.NewInProcessDriver()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() var ch = make(chan struct{}) go queueDispatcher.Consume(ctx) queueDispatcher.Subscribe(&ExampleListener{ch: ch}) queueDispatcher.Dispatch(ctx, queue.Adjust(ExampleJob("foo"), queue.Defer(time.Second))) queueDispatcher.Dispatch(ctx, queue.Adjust(ExampleJob("bar"), queue.Defer(time.Hour))) <-ch }
Output: foo
Index ¶
- Constants
- Variables
- func Providers(optionFunc ...ProvidersOptionFunc) di.Deps
- func UseCodec(codec contract.Codec) func(*Queue)
- func UseEventDispatcher(dispatcher contract.Dispatcher) func(*Queue)
- func UseGauge(gauge metrics.Gauge, interval time.Duration) func(*Queue)
- func UseJobDispatcher(dispatcher JobDispatcher) func(*Queue)
- func UseLogger(logger log.Logger) func(*Queue)
- func UseParallelism(parallelism int) func(*Queue)
- type BeforeAbortPayload
- type BeforeRetryPayload
- type ChannelConfig
- type ConsumableDispatcher
- type DeferrablePersistentJob
- type DispatcherFactory
- type DispatcherMaker
- type Driver
- type DriverArgs
- type Gauge
- type Handler
- type InProcessDriver
- func (i *InProcessDriver) Ack(ctx context.Context, message *PersistedJob) error
- func (i *InProcessDriver) Fail(ctx context.Context, message *PersistedJob) error
- func (i *InProcessDriver) Flush(ctx context.Context, channel string) error
- func (i *InProcessDriver) Info(ctx context.Context) (QueueInfo, error)
- func (i *InProcessDriver) Pop(ctx context.Context) (*PersistedJob, error)
- func (i *InProcessDriver) Push(ctx context.Context, message *PersistedJob, delay time.Duration) error
- func (i *InProcessDriver) Reload(ctx context.Context, channel string) (int64, error)
- func (i *InProcessDriver) Retry(ctx context.Context, message *PersistedJob) error
- type Job
- type JobDispatcher
- type ListenFunc
- type Module
- type PersistOption
- type PersistedJob
- type ProvidersOptionFunc
- type Queue
- type QueueInfo
- type RedisDriver
- func (r *RedisDriver) Ack(ctx context.Context, message *PersistedJob) error
- func (r *RedisDriver) Fail(ctx context.Context, message *PersistedJob) error
- func (r *RedisDriver) Flush(ctx context.Context, channel string) error
- func (r *RedisDriver) Info(ctx context.Context) (QueueInfo, error)
- func (r *RedisDriver) Pop(ctx context.Context) (*PersistedJob, error)
- func (r *RedisDriver) Push(ctx context.Context, message *PersistedJob, delay time.Duration) error
- func (r *RedisDriver) Reload(ctx context.Context, channel string) (int64, error)
- func (r *RedisDriver) Retry(ctx context.Context, message *PersistedJob) error
- type SyncDispatcher
Examples ¶
Constants ¶
const ( // BeforeRetry is an event that triggers when the job failed previously is going to be retried. // Note: if retry attempts are exhausted, this event won't be triggered. BeforeRetry event = "beforeRetry" // BeforeAbort is an event that triggers when the job failed previously is going // to be aborted. If the Job still has retry attempts remaining, this event won't // be triggered. BeforeAbort event = "beforeAbort" )
Variables ¶
var ErrEmpty = errors.New("no message available")
ErrEmpty means the queue is empty.
Functions ¶
func Providers ¶
func Providers(optionFunc ...ProvidersOptionFunc) di.Deps
Providers returns a set JobFrom dependencies related to queue. It includes the DispatcherMaker, the JobDispatcher and the exported configs.
Depends On: contract.ConfigAccessor contract.Dispatcher Driver `optional:"true"` otredis.Maker `optional:"true"` log.Logger contract.AppName contract.Env Gauge `optional:"true"` Provides: DispatcherMaker DispatcherFactory JobDispatcher *Queue
func UseCodec ¶
UseCodec allows consumer to replace the default Packer with a custom one. UsePacker is an option for NewQueue.
func UseEventDispatcher ¶ added in v0.1.1
func UseEventDispatcher(dispatcher contract.Dispatcher) func(*Queue)
UseEventDispatcher is an option for NewQueue to receive events such as fail and retry.
func UseJobDispatcher ¶ added in v0.1.1
func UseJobDispatcher(dispatcher JobDispatcher) func(*Queue)
UseJobDispatcher is an option for NewQueue to swap jobDispatcher dispatcher implementation
func UseLogger ¶
UseLogger is an option for NewQueue that feeds the queue with a Logger JobFrom choice.
func UseParallelism ¶
UseParallelism is an option for NewQueue that sets the parallelism for queue consumption
Types ¶
type BeforeAbortPayload ¶ added in v0.1.1
type BeforeAbortPayload struct { Err error Job *PersistedJob }
type BeforeRetryPayload ¶ added in v0.1.1
type BeforeRetryPayload struct { Err error Job *PersistedJob }
type ChannelConfig ¶
type ChannelConfig struct { Delayed string Failed string Reserved string Waiting string Timeout string }
ChannelConfig describes the key name JobFrom each queue, also known as channel.
type ConsumableDispatcher ¶
type ConsumableDispatcher interface { JobDispatcher Consume(ctx context.Context) error }
ConsumableDispatcher is the key of *Queue in the dependencies graph. Used as a type hint for injection.
type DeferrablePersistentJob ¶
type DeferrablePersistentJob struct { Job // contains filtered or unexported fields }
DeferrablePersistentJob is a persisted Job.
func Adjust ¶
func Adjust(job Job, opts ...PersistOption) DeferrablePersistentJob
Adjust converts any Job to DeferrablePersistentJob. Namely, store them in external storage.
func (DeferrablePersistentJob) Decorate ¶
func (d DeferrablePersistentJob) Decorate(s *PersistedJob)
Decorate decorates the PersistedJob JobFrom this Job by adding some meta info. it is called in the Queue, after the Packer compresses the Job.
func (DeferrablePersistentJob) Defer ¶
func (d DeferrablePersistentJob) Defer() time.Duration
Defer defers the execution JobFrom the reflectionJob for the period JobFrom time returned.
type DispatcherFactory ¶
DispatcherFactory is a factory for *Queue. Note DispatcherFactory doesn't contain the factory method itself. ie. How to factory a dispatcher left there for users to define. Users then can use this type to create their own dispatcher implementation.
Here is an example on how to create a custom DispatcherFactory with an InProcessDriver.
factory := di.NewFactory(func(name string) (di.Pair, error) { queuedDispatcher := queue.NewQueue( &Jobs.SyncDispatcher{}, queue.NewInProcessDriver(), ) return di.Pair{Conn: queuedDispatcher}, nil }) dispatcherFactory := DispatcherFactory{Factory: factory}
type DispatcherMaker ¶
DispatcherMaker is the key of *DispatcherFactory in the dependencies graph. Used as a type hint for injection.
type Driver ¶
type Driver interface { // Push pushes the message onto the queue. It is possible to specify a time delay. If so the message // will be read after the delay. Use zero value if a delay is not needed. Push(ctx context.Context, message *PersistedJob, delay time.Duration) error // Pop pops the message out JobFrom the queue. It blocks until a message is available or a timeout is reached. Pop(ctx context.Context) (*PersistedJob, error) // Ack acknowledges a message has been processed. Ack(ctx context.Context, message *PersistedJob) error // \Fail marks a message has failed. Fail(ctx context.Context, message *PersistedJob) error // Reload put failed/timeout message back to the Waiting queue. If the temporary outage have been cleared, // messages can be tried again via Reload. Reload is not a normal retry. // It similarly gives otherwise dead messages one more chance, // but this chance is not subject to the limit JobFrom MaxAttempts, nor does it reset the number JobFrom time attempted. Reload(ctx context.Context, channel string) (int64, error) // Flush empties the queue under channel Flush(ctx context.Context, channel string) error // Info lists QueueInfo by inspecting queues one by one. Useful for metrics and monitor. Info(ctx context.Context) (QueueInfo, error) // Retry put the message back onto the delayed queue. Retry(ctx context.Context, message *PersistedJob) error }
Driver is the interface for queue engines. See RedisDriver for usage.
type DriverArgs ¶ added in v0.1.2
type DriverArgs struct { Name string Populator contract.DIPopulator }
DriverArgs are arguments to construct the driver. See WithDriverConstructor.
type Handler ¶
type Handler interface { // Listen should return a Job instance with zero value. It tells the dispatcher what type of jobDispatcher this handler is expecting. Listen() Job // Process will be called when a jobDispatcher is ready from queue. Process(ctx context.Context, Job Job) error }
Handler is the handler for Job.
type InProcessDriver ¶
type InProcessDriver struct {
// contains filtered or unexported fields
}
InProcessDriver is a test replacement for redis driver. It doesn't persist your Job in any way, so not suitable for production use.
func NewInProcessDriver ¶
func NewInProcessDriver() *InProcessDriver
NewInProcessDriverWithPopInterval creates an *InProcessDriver for testing
func NewInProcessDriverWithPopInterval ¶
func NewInProcessDriverWithPopInterval(duration time.Duration) *InProcessDriver
NewInProcessDriverWithPopInterval creates an *InProcessDriver with an pop interval.
func (*InProcessDriver) Ack ¶
func (i *InProcessDriver) Ack(ctx context.Context, message *PersistedJob) error
func (*InProcessDriver) Fail ¶
func (i *InProcessDriver) Fail(ctx context.Context, message *PersistedJob) error
func (*InProcessDriver) Flush ¶
func (i *InProcessDriver) Flush(ctx context.Context, channel string) error
func (*InProcessDriver) Info ¶
func (i *InProcessDriver) Info(ctx context.Context) (QueueInfo, error)
func (*InProcessDriver) Pop ¶
func (i *InProcessDriver) Pop(ctx context.Context) (*PersistedJob, error)
func (*InProcessDriver) Push ¶
func (i *InProcessDriver) Push(ctx context.Context, message *PersistedJob, delay time.Duration) error
func (*InProcessDriver) Retry ¶
func (i *InProcessDriver) Retry(ctx context.Context, message *PersistedJob) error
type JobDispatcher ¶ added in v0.1.1
type JobDispatcher interface { Dispatch(ctx context.Context, Job Job) error Subscribe(listener Handler) }
JobDispatcher is the Job registry that is able to send reflectionJob to each Handler.
type ListenFunc ¶
type ListenFunc struct { Job Job // contains filtered or unexported fields }
ListenFunc is a listener implemented with a callback.
type Module ¶
type Module struct {
Factory *DispatcherFactory
}
Module exports queue commands, for example queue flush and queue reload.
func (Module) ProvideCommand ¶
ProvideCommand implements CommandProvider for the Module. It registers flush and reload command to the parent command.
type PersistOption ¶
type PersistOption func(Job *DeferrablePersistentJob)
PersistOption defines some options for Adjust
func Defer ¶
func Defer(duration time.Duration) PersistOption
Defer is a PersistOption that defers the execution JobFrom DeferrablePersistentJob for the period JobFrom time given.
func MaxAttempts ¶
func MaxAttempts(attempts int) PersistOption
MaxAttempts is a PersistOption that defines how many times the Job handler can be retried.
func ScheduleAt ¶
func ScheduleAt(t time.Time) PersistOption
ScheduleAt is a PersistOption that defers the execution JobFrom DeferrablePersistentJob until the time given.
func Timeout ¶
func Timeout(timeout time.Duration) PersistOption
Timeout is a PersistOption that defines the maximum time the Job can be processed until timeout. Note: this timeout is shared among all listeners.
func UniqueId ¶
func UniqueId(id string) PersistOption
UniqueId is a PersistOption that outsources the generation JobFrom uniqueId to the caller.
type PersistedJob ¶
type PersistedJob struct { // The UniqueId identifies each individual message. Sometimes the message can have exact same content and even // exact same Key. UniqueId is used to differentiate them. UniqueId string // Key is the Message type. Usually it is the string name JobFrom the Job type before serialized. Key string // Value is the serialized bytes JobFrom the Job. Value []byte // HandleTimeout sets the upper time limit for each run JobFrom the handler. If handleTimeout exceeds, the Job will // be put onto the timeout queue. Note: the timeout is shared among all listeners. HandleTimeout time.Duration // Backoff sets the duration before next retry. Backoff time.Duration // Attempts denotes how many retry has been attempted. It starts From 1. Attempts int // MaxAttempts denotes the maximum number JobFrom time the handler can retry before the Job is put onto // the failed queue. // By default, MaxAttempts is 1. MaxAttempts int }
PersistedJob represents a persisted Job.
func (*PersistedJob) Data ¶
func (s *PersistedJob) Data() interface{}
Data implements Job. It returns the Value.
func (*PersistedJob) Type ¶
func (s *PersistedJob) Type() string
Type implements Job. It returns the Key.
type ProvidersOptionFunc ¶ added in v0.1.1
type ProvidersOptionFunc func(options *providersOption)
ProvidersOptionFunc is the type of functional providersOption for Providers. Use this type to change how Providers work.
func WithDriver ¶ added in v0.1.1
func WithDriver(driver Driver) ProvidersOptionFunc
WithDriver instructs the Providers to accept a queue driver different from the default one. This option supersedes the WithDriverConstructor option.
func WithDriverConstructor ¶ added in v0.1.1
func WithDriverConstructor(f func(args DriverArgs) (Driver, error)) ProvidersOptionFunc
WithDriverConstructor instructs the Providers to accept an alternative constructor for queue driver. If the WithDriver option is set, this option becomes an no-op.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is an extension JobFrom the embed dispatcher. It adds the deferrableDecorator Job feature.
func NewQueue ¶
NewQueue wraps a Queue and returns a decorated Queue. The latter Queue now can send and listen to "persisted" Jobs. Those persisted Jobs will guarantee at least one execution, as they are stored in an external storage and won't be released until the Queue acknowledges the end JobFrom execution.
func (*Queue) Consume ¶
Consume starts the runner and blocks until context canceled or error occurred.
type QueueInfo ¶
type QueueInfo struct { // Waiting is the length JobFrom the Waiting queue. Waiting int64 // Delayed is the length JobFrom the Delayed queue. Delayed int64 //Timeout is the length JobFrom the Timeout queue. Timeout int64 // Failed is the length JobFrom the Failed queue. Failed int64 }
QueueInfo describes the state JobFrom queues.
type RedisDriver ¶
type RedisDriver struct { Logger log.Logger // Logger is an optional logger. By default a noop logger is used RedisClient redis.UniversalClient // RedisClient is used to communicate with redis ChannelConfig ChannelConfig // ChannelConfig holds the name JobFrom redis keys for all queues. PopTimeout time.Duration // PopTimeout is the BRPOP timeout. ie. How long the pop action will block at most. Packer contract.Codec // Packer describes how to save the message in wire format // contains filtered or unexported fields }
RedisDriver is a queue driver backed by redis. It is easy to setup, and offers at least once semantic.
func (*RedisDriver) Ack ¶
func (r *RedisDriver) Ack(ctx context.Context, message *PersistedJob) error
Ack acknowledges a message has been processed.
func (*RedisDriver) Fail ¶
func (r *RedisDriver) Fail(ctx context.Context, message *PersistedJob) error
Fail marks a message has failed.
func (*RedisDriver) Flush ¶
func (r *RedisDriver) Flush(ctx context.Context, channel string) error
Flush flushes a queue JobFrom choice by deleting all its data. Use with caution.
func (*RedisDriver) Info ¶
func (r *RedisDriver) Info(ctx context.Context) (QueueInfo, error)
Info lists QueueInfo by inspecting queues one by one. Useful for metrics and monitor.
func (*RedisDriver) Pop ¶
func (r *RedisDriver) Pop(ctx context.Context) (*PersistedJob, error)
Pop pops the message out JobFrom the queue. It uses BRPOP underneath, so effectively it blocks until a message is available or a timeout is reached.
func (*RedisDriver) Push ¶
func (r *RedisDriver) Push(ctx context.Context, message *PersistedJob, delay time.Duration) error
Push pushes the message onto the queue. It is possible to specify a time delay. If so the message will be read after the delay. Use zero value if a delay is not needed.
func (*RedisDriver) Reload ¶
Reload put failed/timeout message back to the Waiting queue. If the temporary outage have been cleared, messages can be tried again via Reload. Reload is not a normal retry. It similarly gives otherwise dead messages one more chance, but this chance is not subject to the limit JobFrom MaxAttempts, nor does it reset the number JobFrom time attempted.
func (*RedisDriver) Retry ¶
func (r *RedisDriver) Retry(ctx context.Context, message *PersistedJob) error
Retry put the message back onto the delayed queue. The message will be tried after a period JobFrom time specified by Backoff. Note: if one listener failed, all listeners for this Job will have to be retried. Make sure your listeners are idempotent as always.
type SyncDispatcher ¶
type SyncDispatcher struct {
// contains filtered or unexported fields
}
SyncDispatcher is a contract.Dispatcher implementation that dispatches Jobs synchronously. SyncDispatcher is safe for concurrent use.
func (*SyncDispatcher) Dispatch ¶
func (d *SyncDispatcher) Dispatch(ctx context.Context, Job Job) error
Dispatch dispatches Jobs synchronously. If any listener returns an error, abort the process immediately and return that error to caller.
func (*SyncDispatcher) Subscribe ¶
func (d *SyncDispatcher) Subscribe(listener Handler)
Subscribe subscribes the listener to the dispatcher.