Documentation ¶
Index ¶
- func IsErrInvalidConfiguration(err error) bool
- func RegisteredTypesAsString() []string
- type ChannelQueue
- type ChannelQueueConfiguration
- type Data
- type DummyQueue
- type ErrInvalidConfiguration
- type Flushable
- type HandlerFunc
- type LevelQueue
- type LevelQueueConfiguration
- type ManagedPool
- type ManagedQueue
- func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc
- func (q *ManagedQueue) BlockTimeout() time.Duration
- func (q *ManagedQueue) BoostTimeout() time.Duration
- func (q *ManagedQueue) BoostWorkers() int
- func (q *ManagedQueue) CancelWorkers(pid int64)
- func (q *ManagedQueue) Flush(timeout time.Duration) error
- func (q *ManagedQueue) IsEmpty() bool
- func (q *ManagedQueue) MaxNumberOfWorkers() int
- func (q *ManagedQueue) NumberOfWorkers() int
- func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, ...) int64
- func (q *ManagedQueue) RemoveWorkers(pid int64)
- func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
- func (q *ManagedQueue) Workers() []*PoolWorkers
- type ManagedQueueList
- type Manager
- func (m *Manager) Add(managed interface{}, t Type, configuration, exemplar interface{}) int64
- func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
- func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue
- func (m *Manager) ManagedQueues() []*ManagedQueue
- func (m *Manager) Remove(qid int64)
- type Named
- type NewQueueFunc
- type PersistableChannelQueue
- func (p *PersistableChannelQueue) Flush(timeout time.Duration) error
- func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error
- func (p *PersistableChannelQueue) IsEmpty() bool
- func (p *PersistableChannelQueue) Name() string
- func (p *PersistableChannelQueue) Push(data Data) error
- func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
- func (p *PersistableChannelQueue) Shutdown()
- func (p *PersistableChannelQueue) Terminate()
- type PersistableChannelQueueConfiguration
- type PoolWorkers
- type PoolWorkersList
- type Queue
- func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue
- func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error)
- func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error)
- func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- type RedisQueue
- type RedisQueueConfiguration
- type Shutdownable
- type Type
- type WorkerPool
- func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc
- func (p *WorkerPool) BlockTimeout() time.Duration
- func (p *WorkerPool) BoostTimeout() time.Duration
- func (p *WorkerPool) BoostWorkers() int
- func (p *WorkerPool) CleanUp(ctx context.Context)
- func (p *WorkerPool) Flush(timeout time.Duration) error
- func (p *WorkerPool) FlushWithContext(ctx context.Context) error
- func (p *WorkerPool) IsEmpty() bool
- func (p *WorkerPool) MaxNumberOfWorkers() int
- func (p *WorkerPool) NumberOfWorkers() int
- func (p *WorkerPool) Push(data Data)
- func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int)
- func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
- func (p *WorkerPool) Wait()
- type WorkerPoolConfiguration
- type WrappedQueue
- func (q *WrappedQueue) Flush(timeout time.Duration) error
- func (q *WrappedQueue) FlushWithContext(ctx context.Context) error
- func (q *WrappedQueue) IsEmpty() bool
- func (q *WrappedQueue) Name() string
- func (q *WrappedQueue) Push(data Data) error
- func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func()))
- func (q *WrappedQueue) Shutdown()
- func (q *WrappedQueue) Terminate()
- type WrappedQueueConfiguration
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsErrInvalidConfiguration ¶
IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration
func RegisteredTypesAsString ¶
func RegisteredTypesAsString() []string
RegisteredTypesAsString provides the list of requested types of queues
Types ¶
type ChannelQueue ¶
type ChannelQueue struct { *WorkerPool // contains filtered or unexported fields }
ChannelQueue implements Queue
A channel queue is not persistable and does not shutdown or terminate cleanly It is basically a very thin wrapper around a WorkerPool
func (*ChannelQueue) Name ¶
func (c *ChannelQueue) Name() string
Name returns the name of this queue
func (*ChannelQueue) Push ¶
func (c *ChannelQueue) Push(data Data) error
Push will push data into the queue
func (*ChannelQueue) Run ¶
func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue
type ChannelQueueConfiguration ¶
type ChannelQueueConfiguration struct { WorkerPoolConfiguration Workers int Name string }
ChannelQueueConfiguration is the configuration for a ChannelQueue
type DummyQueue ¶
type DummyQueue struct { }
DummyQueue represents an empty queue
func (*DummyQueue) FlushWithContext ¶
func (b *DummyQueue) FlushWithContext(context.Context) error
FlushWithContext always returns nil
func (*DummyQueue) IsEmpty ¶
func (b *DummyQueue) IsEmpty() bool
IsEmpty asserts that the queue is empty
func (*DummyQueue) Push ¶
func (b *DummyQueue) Push(Data) error
Push fakes a push of data to the queue
func (*DummyQueue) Run ¶
func (b *DummyQueue) Run(_, _ func(context.Context, func()))
Run does nothing
type ErrInvalidConfiguration ¶
type ErrInvalidConfiguration struct {
// contains filtered or unexported fields
}
ErrInvalidConfiguration is called when there is invalid configuration for a queue
func (ErrInvalidConfiguration) Error ¶
func (err ErrInvalidConfiguration) Error() string
type Flushable ¶
type Flushable interface { // Flush will add a flush worker to the pool - the worker should be autoregistered with the manager Flush(time.Duration) error // FlushWithContext is very similar to Flush // NB: The worker will not be registered with the manager. FlushWithContext(ctx context.Context) error // IsEmpty will return if the managed pool is empty and has no work IsEmpty() bool }
Flushable represents a pool or queue that is flushable
type HandlerFunc ¶
type HandlerFunc func(...Data)
HandlerFunc is a function that takes a variable amount of data and processes it
type LevelQueue ¶
type LevelQueue struct { *WorkerPool // contains filtered or unexported fields }
LevelQueue implements a disk library queue
func (*LevelQueue) IsEmpty ¶
func (l *LevelQueue) IsEmpty() bool
IsEmpty checks whether the queue is empty
func (*LevelQueue) Push ¶
func (l *LevelQueue) Push(data Data) error
Push will push the indexer data to queue
func (*LevelQueue) Run ¶
func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue
func (*LevelQueue) Shutdown ¶
func (l *LevelQueue) Shutdown()
Shutdown this queue and stop processing
func (*LevelQueue) Terminate ¶
func (l *LevelQueue) Terminate()
Terminate this queue and close the queue
type LevelQueueConfiguration ¶
type LevelQueueConfiguration struct { WorkerPoolConfiguration DataDir string Workers int Name string }
LevelQueueConfiguration is the configuration for a LevelQueue
type ManagedPool ¶
type ManagedPool interface { // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group AddWorkers(number int, timeout time.Duration) context.CancelFunc // NumberOfWorkers returns the total number of workers in the pool NumberOfWorkers() int // MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to MaxNumberOfWorkers() int // SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to SetMaxNumberOfWorkers(int) // BoostTimeout returns the current timeout for worker groups created during a boost BoostTimeout() time.Duration // BlockTimeout returns the timeout the internal channel can block for before a boost would occur BlockTimeout() time.Duration // BoostWorkers sets the number of workers to be created during a boost BoostWorkers() int // SetPoolSettings sets the user updatable settings for the pool SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) }
ManagedPool is a simple interface to get certain details from a worker pool
type ManagedQueue ¶
type ManagedQueue struct { QID int64 Type Type Name string Configuration interface{} ExemplarType string Managed interface{} PoolWorkers map[int64]*PoolWorkers // contains filtered or unexported fields }
ManagedQueue represents a working queue with a Pool of workers.
Although a ManagedQueue should really represent a Queue this does not necessarily have to be the case. This could be used to describe any queue.WorkerPool.
func (*ManagedQueue) AddWorkers ¶
func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc
AddWorkers adds workers to the queue if it has registered an add worker function
func (*ManagedQueue) BlockTimeout ¶
func (q *ManagedQueue) BlockTimeout() time.Duration
BlockTimeout returns the timeout til the next boost
func (*ManagedQueue) BoostTimeout ¶
func (q *ManagedQueue) BoostTimeout() time.Duration
BoostTimeout returns the timeout of the next boost
func (*ManagedQueue) BoostWorkers ¶
func (q *ManagedQueue) BoostWorkers() int
BoostWorkers returns the number of workers for a boost
func (*ManagedQueue) CancelWorkers ¶
func (q *ManagedQueue) CancelWorkers(pid int64)
CancelWorkers cancels pooled workers with pid
func (*ManagedQueue) Flush ¶
func (q *ManagedQueue) Flush(timeout time.Duration) error
Flush flushes the queue with a timeout
func (*ManagedQueue) IsEmpty ¶
func (q *ManagedQueue) IsEmpty() bool
IsEmpty returns if the queue is empty
func (*ManagedQueue) MaxNumberOfWorkers ¶
func (q *ManagedQueue) MaxNumberOfWorkers() int
MaxNumberOfWorkers returns the maximum number of workers for the pool
func (*ManagedQueue) NumberOfWorkers ¶
func (q *ManagedQueue) NumberOfWorkers() int
NumberOfWorkers returns the number of workers in the queue
func (*ManagedQueue) RegisterWorkers ¶
func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64
RegisterWorkers registers workers to this queue
func (*ManagedQueue) RemoveWorkers ¶
func (q *ManagedQueue) RemoveWorkers(pid int64)
RemoveWorkers deletes pooled workers with pid
func (*ManagedQueue) SetPoolSettings ¶
func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
SetPoolSettings sets the setable boost values
func (*ManagedQueue) Workers ¶
func (q *ManagedQueue) Workers() []*PoolWorkers
Workers returns the poolworkers
type ManagedQueueList ¶
type ManagedQueueList []*ManagedQueue
ManagedQueueList implements the sort.Interface
func (ManagedQueueList) Len ¶
func (l ManagedQueueList) Len() int
func (ManagedQueueList) Less ¶
func (l ManagedQueueList) Less(i, j int) bool
func (ManagedQueueList) Swap ¶
func (l ManagedQueueList) Swap(i, j int)
type Manager ¶
type Manager struct { Queues map[int64]*ManagedQueue // contains filtered or unexported fields }
Manager is a queue manager
func GetManager ¶
func GetManager() *Manager
GetManager returns a Manager and initializes one as singleton if there's none yet
func (*Manager) GetManagedQueue ¶
func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue
GetManagedQueue by qid
func (*Manager) ManagedQueues ¶
func (m *Manager) ManagedQueues() []*ManagedQueue
ManagedQueues returns the managed queues
type NewQueueFunc ¶
type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error)
NewQueueFunc is a function that creates a queue
type PersistableChannelQueue ¶
type PersistableChannelQueue struct {
// contains filtered or unexported fields
}
PersistableChannelQueue wraps a channel queue and level queue together The disk level queue will be used to store data at shutdown and terminate - and will be restored on start up.
func (*PersistableChannelQueue) Flush ¶
func (p *PersistableChannelQueue) Flush(timeout time.Duration) error
Flush flushes the queue and blocks till the queue is empty
func (*PersistableChannelQueue) FlushWithContext ¶
func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error
FlushWithContext flushes the queue and blocks till the queue is empty
func (*PersistableChannelQueue) IsEmpty ¶
func (p *PersistableChannelQueue) IsEmpty() bool
IsEmpty checks if a queue is empty
func (*PersistableChannelQueue) Name ¶
func (p *PersistableChannelQueue) Name() string
Name returns the name of this queue
func (*PersistableChannelQueue) Push ¶
func (p *PersistableChannelQueue) Push(data Data) error
Push will push the indexer data to queue
func (*PersistableChannelQueue) Run ¶
func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue
func (*PersistableChannelQueue) Shutdown ¶
func (p *PersistableChannelQueue) Shutdown()
Shutdown processing this queue
func (*PersistableChannelQueue) Terminate ¶
func (p *PersistableChannelQueue) Terminate()
Terminate this queue and close the queue
type PersistableChannelQueueConfiguration ¶
type PersistableChannelQueueConfiguration struct { Name string DataDir string BatchLength int QueueLength int Timeout time.Duration MaxAttempts int Workers int MaxWorkers int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int }
PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue
type PoolWorkers ¶
type PoolWorkers struct { PID int64 Workers int Start time.Time Timeout time.Time HasTimeout bool Cancel context.CancelFunc IsFlusher bool }
PoolWorkers represents a group of workers working on a queue
type PoolWorkersList ¶
type PoolWorkersList []*PoolWorkers
PoolWorkersList implements the sort.Interface for PoolWorkers
func (PoolWorkersList) Len ¶
func (l PoolWorkersList) Len() int
func (PoolWorkersList) Less ¶
func (l PoolWorkersList) Less(i, j int) bool
func (PoolWorkersList) Swap ¶
func (l PoolWorkersList) Swap(i, j int)
type Queue ¶
type Queue interface { Flushable Run(atShutdown, atTerminate func(context.Context, func())) Push(Data) error }
Queue defines an interface of a queue-like item
Queues will handle their own contents in the Run method
func CreateQueue ¶
func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue
CreateQueue for name with provided handler and exemplar
func NewChannelQueue ¶
func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewChannelQueue creates a memory channel queue
func NewDummyQueue ¶
func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error)
NewDummyQueue creates a new DummyQueue
func NewLevelQueue ¶
func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewLevelQueue creates a ledis local queue
func NewPersistableChannelQueue ¶
func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
func NewQueue ¶
func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error)
NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error
func NewRedisQueue ¶
func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewRedisQueue creates single redis or cluster redis queue
func NewWrappedQueue ¶
func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewWrappedQueue will attempt to create a queue of the provided type, but if there is a problem creating this queue it will instead create a WrappedQueue with delayed startup of the queue instead and a channel which will be redirected to the queue
type RedisQueue ¶
type RedisQueue struct { *WorkerPool // contains filtered or unexported fields }
RedisQueue redis queue
func (*RedisQueue) IsEmpty ¶
func (r *RedisQueue) IsEmpty() bool
IsEmpty checks if the queue is empty
func (*RedisQueue) Run ¶
func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run runs the redis queue
func (*RedisQueue) Terminate ¶
func (r *RedisQueue) Terminate()
Terminate this queue and close the queue
type RedisQueueConfiguration ¶
type RedisQueueConfiguration struct { WorkerPoolConfiguration Network string Addresses string Password string DBIndex int QueueName string Workers int Name string }
RedisQueueConfiguration is the configuration for the redis queue
type Shutdownable ¶
type Shutdownable interface { Shutdown() Terminate() }
Shutdownable represents a queue that can be shutdown
type Type ¶
type Type string
Type is a type of Queue
const ChannelQueueType Type = "channel"
ChannelQueueType is the type for channel queue
const DummyQueueType Type = "dummy"
DummyQueueType is the type for the dummy queue
const LevelQueueType Type = "level"
LevelQueueType is the type for level queue
const PersistableChannelQueueType Type = "persistable-channel"
PersistableChannelQueueType is the type for persistable queue
const RedisQueueType Type = "redis"
RedisQueueType is the type for redis queue
const WrappedQueueType Type = "wrapped"
WrappedQueueType is the type for a wrapped delayed starting queue
func RegisteredTypes ¶
func RegisteredTypes() []Type
RegisteredTypes provides the list of requested types of queues
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool represent a dynamically growable worker pool for a provided handler function. They have an internal channel which they use to detect if there is a block and will grow and shrink in response to demand as per configuration.
func NewWorkerPool ¶
func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool
NewWorkerPool creates a new worker pool
func (*WorkerPool) AddWorkers ¶
func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc
AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
func (*WorkerPool) BlockTimeout ¶
func (p *WorkerPool) BlockTimeout() time.Duration
BlockTimeout returns the timeout til the next boost
func (*WorkerPool) BoostTimeout ¶
func (p *WorkerPool) BoostTimeout() time.Duration
BoostTimeout returns the timeout of the next boost
func (*WorkerPool) BoostWorkers ¶
func (p *WorkerPool) BoostWorkers() int
BoostWorkers returns the number of workers for a boost
func (*WorkerPool) CleanUp ¶
func (p *WorkerPool) CleanUp(ctx context.Context)
CleanUp will drain the remaining contents of the channel This should be called after AddWorkers context is closed
func (*WorkerPool) Flush ¶
func (p *WorkerPool) Flush(timeout time.Duration) error
Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
func (*WorkerPool) FlushWithContext ¶
func (p *WorkerPool) FlushWithContext(ctx context.Context) error
FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty NB: The worker will not be registered with the manager.
func (*WorkerPool) IsEmpty ¶
func (p *WorkerPool) IsEmpty() bool
IsEmpty returns if true if the worker queue is empty
func (*WorkerPool) MaxNumberOfWorkers ¶
func (p *WorkerPool) MaxNumberOfWorkers() int
MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool
func (*WorkerPool) NumberOfWorkers ¶
func (p *WorkerPool) NumberOfWorkers() int
NumberOfWorkers returns the number of current workers in the pool
func (*WorkerPool) Push ¶
func (p *WorkerPool) Push(data Data)
Push pushes the data to the internal channel
func (*WorkerPool) SetMaxNumberOfWorkers ¶
func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int)
SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool Changing this number will not change the number of current workers but will change the limit for future additions
func (*WorkerPool) SetPoolSettings ¶
func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
SetPoolSettings sets the setable boost values
type WorkerPoolConfiguration ¶
type WorkerPoolConfiguration struct { QueueLength int BatchLength int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int MaxWorkers int }
WorkerPoolConfiguration is the basic configuration for a WorkerPool
type WrappedQueue ¶
type WrappedQueue struct {
// contains filtered or unexported fields
}
WrappedQueue wraps a delayed starting queue
func (*WrappedQueue) Flush ¶
func (q *WrappedQueue) Flush(timeout time.Duration) error
Flush flushes the queue and blocks till the queue is empty
func (*WrappedQueue) FlushWithContext ¶
func (q *WrappedQueue) FlushWithContext(ctx context.Context) error
FlushWithContext implements the final part of Flushable
func (*WrappedQueue) IsEmpty ¶
func (q *WrappedQueue) IsEmpty() bool
IsEmpty checks whether the queue is empty
func (*WrappedQueue) Push ¶
func (q *WrappedQueue) Push(data Data) error
Push will push the data to the internal channel checking it against the exemplar
func (*WrappedQueue) Run ¶
func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue and attempts to create the internal queue
func (*WrappedQueue) Shutdown ¶
func (q *WrappedQueue) Shutdown()
Shutdown this queue and stop processing
func (*WrappedQueue) Terminate ¶
func (q *WrappedQueue) Terminate()
Terminate this queue and close the queue