Documentation ¶
Index ¶
- Variables
- func IsErrInvalidConfiguration(err error) bool
- func RegisteredTypesAsString() []string
- type ByteFIFO
- type ByteFIFOQueue
- func (q *ByteFIFOQueue) IsEmpty() bool
- func (q *ByteFIFOQueue) IsShutdown() <-chan struct{}
- func (q *ByteFIFOQueue) IsTerminated() <-chan struct{}
- func (q *ByteFIFOQueue) Name() string
- func (q *ByteFIFOQueue) Push(data Data) error
- func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error
- func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func()))
- func (q *ByteFIFOQueue) Shutdown()
- func (q *ByteFIFOQueue) Terminate()
- type ByteFIFOQueueConfiguration
- type ByteFIFOUniqueQueue
- type ChannelQueue
- type ChannelQueueConfiguration
- type ChannelUniqueQueue
- func (q *ChannelUniqueQueue) Has(data Data) (bool, error)
- func (q *ChannelUniqueQueue) Name() string
- func (q *ChannelUniqueQueue) Push(data Data) error
- func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error
- func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func()))
- type ChannelUniqueQueueConfiguration
- type Data
- type DummyByteFIFO
- type DummyQueue
- func (*DummyQueue) Flush(time.Duration) error
- func (*DummyQueue) FlushWithContext(context.Context) error
- func (*DummyQueue) Has(Data) (bool, error)
- func (*DummyQueue) IsEmpty() bool
- func (*DummyQueue) Push(Data) error
- func (*DummyQueue) PushFunc(Data, func() error) error
- func (*DummyQueue) Run(_, _ func(context.Context, func()))
- type DummyUniqueByteFIFO
- type ErrInvalidConfiguration
- type Flushable
- type HandlerFunc
- type Immediate
- func (*Immediate) Flush(time.Duration) error
- func (*Immediate) FlushWithContext(context.Context) error
- func (*Immediate) Has(Data) (bool, error)
- func (*Immediate) IsEmpty() bool
- func (q *Immediate) Push(data Data) error
- func (q *Immediate) PushFunc(data Data, f func() error) error
- func (*Immediate) Run(_, _ func(context.Context, func()))
- type LevelQueue
- type LevelQueueByteFIFO
- type LevelQueueConfiguration
- type LevelUniqueQueue
- type LevelUniqueQueueByteFIFO
- func (fifo *LevelUniqueQueueByteFIFO) Close() error
- func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error)
- func (fifo *LevelUniqueQueueByteFIFO) Len() int64
- func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error)
- func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error
- type LevelUniqueQueueConfiguration
- type ManagedPool
- type ManagedQueue
- func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc
- func (q *ManagedQueue) BlockTimeout() time.Duration
- func (q *ManagedQueue) BoostTimeout() time.Duration
- func (q *ManagedQueue) BoostWorkers() int
- func (q *ManagedQueue) CancelWorkers(pid int64)
- func (q *ManagedQueue) Flush(timeout time.Duration) error
- func (q *ManagedQueue) IsEmpty() bool
- func (q *ManagedQueue) MaxNumberOfWorkers() int
- func (q *ManagedQueue) NumberOfWorkers() int
- func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, ...) int64
- func (q *ManagedQueue) RemoveWorkers(pid int64)
- func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
- func (q *ManagedQueue) Workers() []*PoolWorkers
- type ManagedQueueList
- type Manager
- func (m *Manager) Add(managed interface{}, t Type, configuration, exemplar interface{}) int64
- func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
- func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue
- func (m *Manager) ManagedQueues() []*ManagedQueue
- func (m *Manager) Remove(qid int64)
- type Mappable
- type Named
- type NewQueueFunc
- type PersistableChannelQueue
- func (q *PersistableChannelQueue) Flush(timeout time.Duration) error
- func (q *PersistableChannelQueue) FlushWithContext(ctx context.Context) error
- func (q *PersistableChannelQueue) IsEmpty() bool
- func (q *PersistableChannelQueue) Name() string
- func (q *PersistableChannelQueue) Push(data Data) error
- func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
- func (q *PersistableChannelQueue) Shutdown()
- func (q *PersistableChannelQueue) Terminate()
- type PersistableChannelQueueConfiguration
- type PersistableChannelUniqueQueue
- func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error
- func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error)
- func (q *PersistableChannelUniqueQueue) Name() string
- func (q *PersistableChannelUniqueQueue) Push(data Data) error
- func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) error
- func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func()))
- func (q *PersistableChannelUniqueQueue) Shutdown()
- func (q *PersistableChannelUniqueQueue) Terminate()
- type PersistableChannelUniqueQueueConfiguration
- type PoolWorkers
- type PoolWorkersList
- type Queue
- func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue
- func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error)
- func NewImmediate(handler HandlerFunc, opts, exemplar interface{}) (Queue, error)
- func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error)
- func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- type RedisByteFIFO
- type RedisByteFIFOConfiguration
- type RedisQueue
- type RedisQueueConfiguration
- type RedisUniqueByteFIFO
- type RedisUniqueByteFIFOConfiguration
- type RedisUniqueQueue
- type RedisUniqueQueueConfiguration
- type Shutdownable
- type Type
- type UniqueByteFIFO
- type UniqueQueue
- type WorkerPool
- func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc
- func (p *WorkerPool) BlockTimeout() time.Duration
- func (p *WorkerPool) BoostTimeout() time.Duration
- func (p *WorkerPool) BoostWorkers() int
- func (p *WorkerPool) CleanUp(ctx context.Context)
- func (p *WorkerPool) Flush(timeout time.Duration) error
- func (p *WorkerPool) FlushWithContext(ctx context.Context) error
- func (p *WorkerPool) IsEmpty() bool
- func (p *WorkerPool) MaxNumberOfWorkers() int
- func (p *WorkerPool) NumberOfWorkers() int
- func (p *WorkerPool) Push(data Data)
- func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int)
- func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
- func (p *WorkerPool) Wait()
- type WorkerPoolConfiguration
- type WrappedQueue
- func (q *WrappedQueue) Flush(timeout time.Duration) error
- func (q *WrappedQueue) FlushWithContext(ctx context.Context) error
- func (q *WrappedQueue) IsEmpty() bool
- func (q *WrappedQueue) Name() string
- func (q *WrappedQueue) Push(data Data) error
- func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func()))
- func (q *WrappedQueue) Shutdown()
- func (q *WrappedQueue) Terminate()
- type WrappedQueueConfiguration
- type WrappedUniqueQueue
- type WrappedUniqueQueueConfiguration
Constants ¶
This section is empty.
Variables ¶
var ErrAlreadyInQueue = fmt.Errorf("already in queue")
ErrAlreadyInQueue is returned when trying to push data to the queue that is already in the queue
Functions ¶
func IsErrInvalidConfiguration ¶
IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration
func RegisteredTypesAsString ¶
func RegisteredTypesAsString() []string
RegisteredTypesAsString provides the list of requested types of queues
Types ¶
type ByteFIFO ¶ added in v1.12.0
type ByteFIFO interface { // Len returns the length of the fifo Len() int64 // PushFunc pushes data to the end of the fifo and calls the callback if it is added PushFunc(data []byte, fn func() error) error // Pop pops data from the start of the fifo Pop() ([]byte, error) // Close this fifo Close() error }
ByteFIFO defines a FIFO that takes a byte array
type ByteFIFOQueue ¶ added in v1.12.0
type ByteFIFOQueue struct { *WorkerPool // contains filtered or unexported fields }
ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
func NewByteFIFOQueue ¶ added in v1.12.0
func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error)
NewByteFIFOQueue creates a new ByteFIFOQueue
func (*ByteFIFOQueue) IsEmpty ¶ added in v1.12.0
func (q *ByteFIFOQueue) IsEmpty() bool
IsEmpty checks if the queue is empty
func (*ByteFIFOQueue) IsShutdown ¶ added in v1.14.0
func (q *ByteFIFOQueue) IsShutdown() <-chan struct{}
IsShutdown returns a channel which is closed when this Queue is shutdown
func (*ByteFIFOQueue) IsTerminated ¶ added in v1.14.0
func (q *ByteFIFOQueue) IsTerminated() <-chan struct{}
IsTerminated returns a channel which is closed when this Queue is terminated
func (*ByteFIFOQueue) Name ¶ added in v1.12.0
func (q *ByteFIFOQueue) Name() string
Name returns the name of this queue
func (*ByteFIFOQueue) Push ¶ added in v1.12.0
func (q *ByteFIFOQueue) Push(data Data) error
Push pushes data to the fifo
func (*ByteFIFOQueue) PushFunc ¶ added in v1.12.0
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error
PushFunc pushes data to the fifo
func (*ByteFIFOQueue) Run ¶ added in v1.12.0
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run runs the bytefifo queue
func (*ByteFIFOQueue) Shutdown ¶ added in v1.12.0
func (q *ByteFIFOQueue) Shutdown()
Shutdown processing from this queue
func (*ByteFIFOQueue) Terminate ¶ added in v1.12.0
func (q *ByteFIFOQueue) Terminate()
Terminate this queue and close the queue
type ByteFIFOQueueConfiguration ¶ added in v1.12.0
type ByteFIFOQueueConfiguration struct { WorkerPoolConfiguration Workers int Name string }
ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
type ByteFIFOUniqueQueue ¶ added in v1.12.0
type ByteFIFOUniqueQueue struct {
ByteFIFOQueue
}
ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo
func NewByteFIFOUniqueQueue ¶ added in v1.12.0
func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error)
NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue
type ChannelQueue ¶
type ChannelQueue struct { *WorkerPool // contains filtered or unexported fields }
ChannelQueue implements Queue
A channel queue is not persistable and does not shutdown or terminate cleanly It is basically a very thin wrapper around a WorkerPool
func (*ChannelQueue) Name ¶
func (q *ChannelQueue) Name() string
Name returns the name of this queue
func (*ChannelQueue) Push ¶
func (q *ChannelQueue) Push(data Data) error
Push will push data into the queue
func (*ChannelQueue) Run ¶
func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue
type ChannelQueueConfiguration ¶
type ChannelQueueConfiguration struct { WorkerPoolConfiguration Workers int Name string }
ChannelQueueConfiguration is the configuration for a ChannelQueue
type ChannelUniqueQueue ¶ added in v1.12.0
type ChannelUniqueQueue struct { *WorkerPool // contains filtered or unexported fields }
ChannelUniqueQueue implements UniqueQueue
It is basically a thin wrapper around a WorkerPool but keeps a store of what has been pushed within a table.
Please note that this Queue does not guarantee that a particular task cannot be processed twice or more at the same time. Uniqueness is only guaranteed whilst the task is waiting in the queue.
func (*ChannelUniqueQueue) Has ¶ added in v1.12.0
func (q *ChannelUniqueQueue) Has(data Data) (bool, error)
Has checks if the data is in the queue
func (*ChannelUniqueQueue) Name ¶ added in v1.12.0
func (q *ChannelUniqueQueue) Name() string
Name returns the name of this queue
func (*ChannelUniqueQueue) Push ¶ added in v1.12.0
func (q *ChannelUniqueQueue) Push(data Data) error
Push will push data into the queue if the data is not already in the queue
func (*ChannelUniqueQueue) PushFunc ¶ added in v1.12.0
func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error
PushFunc will push data into the queue
func (*ChannelUniqueQueue) Run ¶ added in v1.12.0
func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue
type ChannelUniqueQueueConfiguration ¶ added in v1.12.0
type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
ChannelUniqueQueueConfiguration is the configuration for a ChannelUniqueQueue
type DummyByteFIFO ¶ added in v1.12.0
type DummyByteFIFO struct{}
DummyByteFIFO represents a dummy fifo
func (*DummyByteFIFO) Close ¶ added in v1.12.0
func (*DummyByteFIFO) Close() error
Close returns nil
func (*DummyByteFIFO) Pop ¶ added in v1.12.0
func (*DummyByteFIFO) Pop() ([]byte, error)
Pop returns nil
type DummyQueue ¶
type DummyQueue struct { }
DummyQueue represents an empty queue
func (*DummyQueue) Flush ¶ added in v1.12.0
func (*DummyQueue) Flush(time.Duration) error
Flush always returns nil
func (*DummyQueue) FlushWithContext ¶ added in v1.12.0
func (*DummyQueue) FlushWithContext(context.Context) error
FlushWithContext always returns nil
func (*DummyQueue) Has ¶ added in v1.12.0
func (*DummyQueue) Has(Data) (bool, error)
Has always returns false as this queue never does anything
func (*DummyQueue) IsEmpty ¶ added in v1.12.0
func (*DummyQueue) IsEmpty() bool
IsEmpty asserts that the queue is empty
func (*DummyQueue) Push ¶
func (*DummyQueue) Push(Data) error
Push fakes a push of data to the queue
func (*DummyQueue) PushFunc ¶ added in v1.12.0
func (*DummyQueue) PushFunc(Data, func() error) error
PushFunc fakes a push of data to the queue with a function. The function is never run.
func (*DummyQueue) Run ¶
func (*DummyQueue) Run(_, _ func(context.Context, func()))
Run does nothing
type DummyUniqueByteFIFO ¶ added in v1.12.0
type DummyUniqueByteFIFO struct {
DummyByteFIFO
}
DummyUniqueByteFIFO represents a dummy unique fifo
type ErrInvalidConfiguration ¶
type ErrInvalidConfiguration struct {
// contains filtered or unexported fields
}
ErrInvalidConfiguration is called when there is invalid configuration for a queue
func (ErrInvalidConfiguration) Error ¶
func (err ErrInvalidConfiguration) Error() string
type Flushable ¶ added in v1.12.0
type Flushable interface { // Flush will add a flush worker to the pool - the worker should be autoregistered with the manager Flush(time.Duration) error // FlushWithContext is very similar to Flush // NB: The worker will not be registered with the manager. FlushWithContext(ctx context.Context) error // IsEmpty will return if the managed pool is empty and has no work IsEmpty() bool }
Flushable represents a pool or queue that is flushable
type HandlerFunc ¶
type HandlerFunc func(...Data)
HandlerFunc is a function that takes a variable amount of data and processes it
type Immediate ¶ added in v1.13.0
type Immediate struct {
// contains filtered or unexported fields
}
Immediate represents an direct execution queue
func (*Immediate) FlushWithContext ¶ added in v1.13.0
FlushWithContext always returns nil
type LevelQueue ¶
type LevelQueue struct {
*ByteFIFOQueue
}
LevelQueue implements a disk library queue
type LevelQueueByteFIFO ¶ added in v1.12.0
type LevelQueueByteFIFO struct {
// contains filtered or unexported fields
}
LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue
func NewLevelQueueByteFIFO ¶ added in v1.12.0
func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, error)
NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue
func (*LevelQueueByteFIFO) Close ¶ added in v1.12.0
func (fifo *LevelQueueByteFIFO) Close() error
Close this fifo
func (*LevelQueueByteFIFO) Len ¶ added in v1.12.0
func (fifo *LevelQueueByteFIFO) Len() int64
Len returns the length of the fifo
func (*LevelQueueByteFIFO) Pop ¶ added in v1.12.0
func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error)
Pop pops data from the start of the fifo
type LevelQueueConfiguration ¶
type LevelQueueConfiguration struct { ByteFIFOQueueConfiguration DataDir string ConnectionString string QueueName string }
LevelQueueConfiguration is the configuration for a LevelQueue
type LevelUniqueQueue ¶ added in v1.12.0
type LevelUniqueQueue struct {
*ByteFIFOUniqueQueue
}
LevelUniqueQueue implements a disk library queue
type LevelUniqueQueueByteFIFO ¶ added in v1.12.0
type LevelUniqueQueueByteFIFO struct {
// contains filtered or unexported fields
}
LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue
func NewLevelUniqueQueueByteFIFO ¶ added in v1.12.0
func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueByteFIFO, error)
NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue
func (*LevelUniqueQueueByteFIFO) Close ¶ added in v1.12.0
func (fifo *LevelUniqueQueueByteFIFO) Close() error
Close this fifo
func (*LevelUniqueQueueByteFIFO) Has ¶ added in v1.12.0
func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error)
Has returns whether the fifo contains this data
func (*LevelUniqueQueueByteFIFO) Len ¶ added in v1.12.0
func (fifo *LevelUniqueQueueByteFIFO) Len() int64
Len returns the length of the fifo
func (*LevelUniqueQueueByteFIFO) Pop ¶ added in v1.12.0
func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error)
Pop pops data from the start of the fifo
type LevelUniqueQueueConfiguration ¶ added in v1.12.0
type LevelUniqueQueueConfiguration struct { ByteFIFOQueueConfiguration DataDir string ConnectionString string QueueName string }
LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue
type ManagedPool ¶
type ManagedPool interface { // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group AddWorkers(number int, timeout time.Duration) context.CancelFunc // NumberOfWorkers returns the total number of workers in the pool NumberOfWorkers() int // MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to MaxNumberOfWorkers() int // SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to SetMaxNumberOfWorkers(int) // BoostTimeout returns the current timeout for worker groups created during a boost BoostTimeout() time.Duration // BlockTimeout returns the timeout the internal channel can block for before a boost would occur BlockTimeout() time.Duration // BoostWorkers sets the number of workers to be created during a boost BoostWorkers() int // SetPoolSettings sets the user updatable settings for the pool SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) }
ManagedPool is a simple interface to get certain details from a worker pool
type ManagedQueue ¶
type ManagedQueue struct { QID int64 Type Type Name string Configuration interface{} ExemplarType string Managed interface{} PoolWorkers map[int64]*PoolWorkers // contains filtered or unexported fields }
ManagedQueue represents a working queue with a Pool of workers.
Although a ManagedQueue should really represent a Queue this does not necessarily have to be the case. This could be used to describe any queue.WorkerPool.
func (*ManagedQueue) AddWorkers ¶
func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc
AddWorkers adds workers to the queue if it has registered an add worker function
func (*ManagedQueue) BlockTimeout ¶
func (q *ManagedQueue) BlockTimeout() time.Duration
BlockTimeout returns the timeout til the next boost
func (*ManagedQueue) BoostTimeout ¶
func (q *ManagedQueue) BoostTimeout() time.Duration
BoostTimeout returns the timeout of the next boost
func (*ManagedQueue) BoostWorkers ¶
func (q *ManagedQueue) BoostWorkers() int
BoostWorkers returns the number of workers for a boost
func (*ManagedQueue) CancelWorkers ¶
func (q *ManagedQueue) CancelWorkers(pid int64)
CancelWorkers cancels pooled workers with pid
func (*ManagedQueue) Flush ¶ added in v1.12.0
func (q *ManagedQueue) Flush(timeout time.Duration) error
Flush flushes the queue with a timeout
func (*ManagedQueue) IsEmpty ¶ added in v1.12.0
func (q *ManagedQueue) IsEmpty() bool
IsEmpty returns if the queue is empty
func (*ManagedQueue) MaxNumberOfWorkers ¶
func (q *ManagedQueue) MaxNumberOfWorkers() int
MaxNumberOfWorkers returns the maximum number of workers for the pool
func (*ManagedQueue) NumberOfWorkers ¶
func (q *ManagedQueue) NumberOfWorkers() int
NumberOfWorkers returns the number of workers in the queue
func (*ManagedQueue) RegisterWorkers ¶
func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64
RegisterWorkers registers workers to this queue
func (*ManagedQueue) RemoveWorkers ¶
func (q *ManagedQueue) RemoveWorkers(pid int64)
RemoveWorkers deletes pooled workers with pid
func (*ManagedQueue) SetPoolSettings ¶ added in v1.12.0
func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
SetPoolSettings sets the setable boost values
func (*ManagedQueue) Workers ¶
func (q *ManagedQueue) Workers() []*PoolWorkers
Workers returns the poolworkers
type ManagedQueueList ¶
type ManagedQueueList []*ManagedQueue
ManagedQueueList implements the sort.Interface
func (ManagedQueueList) Len ¶
func (l ManagedQueueList) Len() int
func (ManagedQueueList) Less ¶
func (l ManagedQueueList) Less(i, j int) bool
func (ManagedQueueList) Swap ¶
func (l ManagedQueueList) Swap(i, j int)
type Manager ¶
type Manager struct { Queues map[int64]*ManagedQueue // contains filtered or unexported fields }
Manager is a queue manager
func GetManager ¶
func GetManager() *Manager
GetManager returns a Manager and initializes one as singleton if there's none yet
func (*Manager) FlushAll ¶ added in v1.12.0
FlushAll flushes all the flushable queues attached to this manager
func (*Manager) GetManagedQueue ¶
func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue
GetManagedQueue by qid
func (*Manager) ManagedQueues ¶
func (m *Manager) ManagedQueues() []*ManagedQueue
ManagedQueues returns the managed queues
type Mappable ¶ added in v1.14.0
type Mappable interface {
MapTo(v interface{}) error
}
Mappable represents an interface that can MapTo another interface
type NewQueueFunc ¶
type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error)
NewQueueFunc is a function that creates a queue
type PersistableChannelQueue ¶
type PersistableChannelQueue struct {
// contains filtered or unexported fields
}
PersistableChannelQueue wraps a channel queue and level queue together The disk level queue will be used to store data at shutdown and terminate - and will be restored on start up.
func (*PersistableChannelQueue) Flush ¶ added in v1.12.0
func (q *PersistableChannelQueue) Flush(timeout time.Duration) error
Flush flushes the queue and blocks till the queue is empty
func (*PersistableChannelQueue) FlushWithContext ¶ added in v1.12.0
func (q *PersistableChannelQueue) FlushWithContext(ctx context.Context) error
FlushWithContext flushes the queue and blocks till the queue is empty
func (*PersistableChannelQueue) IsEmpty ¶ added in v1.12.0
func (q *PersistableChannelQueue) IsEmpty() bool
IsEmpty checks if a queue is empty
func (*PersistableChannelQueue) Name ¶
func (q *PersistableChannelQueue) Name() string
Name returns the name of this queue
func (*PersistableChannelQueue) Push ¶
func (q *PersistableChannelQueue) Push(data Data) error
Push will push the indexer data to queue
func (*PersistableChannelQueue) Run ¶
func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue
func (*PersistableChannelQueue) Shutdown ¶
func (q *PersistableChannelQueue) Shutdown()
Shutdown processing this queue
func (*PersistableChannelQueue) Terminate ¶
func (q *PersistableChannelQueue) Terminate()
Terminate this queue and close the queue
type PersistableChannelQueueConfiguration ¶
type PersistableChannelQueueConfiguration struct { Name string DataDir string BatchLength int QueueLength int Timeout time.Duration MaxAttempts int Workers int MaxWorkers int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int }
PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue
type PersistableChannelUniqueQueue ¶ added in v1.12.0
type PersistableChannelUniqueQueue struct { *ChannelUniqueQueue // contains filtered or unexported fields }
PersistableChannelUniqueQueue wraps a channel queue and level queue together
Please note that this Queue does not guarantee that a particular task cannot be processed twice or more at the same time. Uniqueness is only guaranteed whilst the task is waiting in the queue.
func (*PersistableChannelUniqueQueue) Flush ¶ added in v1.12.0
func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error
Flush flushes the queue
func (*PersistableChannelUniqueQueue) Has ¶ added in v1.12.0
func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error)
Has will test if the queue has the data
func (*PersistableChannelUniqueQueue) Name ¶ added in v1.12.0
func (q *PersistableChannelUniqueQueue) Name() string
Name returns the name of this queue
func (*PersistableChannelUniqueQueue) Push ¶ added in v1.12.0
func (q *PersistableChannelUniqueQueue) Push(data Data) error
Push will push the indexer data to queue
func (*PersistableChannelUniqueQueue) PushFunc ¶ added in v1.12.0
func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) error
PushFunc will push the indexer data to queue
func (*PersistableChannelUniqueQueue) Run ¶ added in v1.12.0
func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue
func (*PersistableChannelUniqueQueue) Shutdown ¶ added in v1.12.0
func (q *PersistableChannelUniqueQueue) Shutdown()
Shutdown processing this queue
func (*PersistableChannelUniqueQueue) Terminate ¶ added in v1.12.0
func (q *PersistableChannelUniqueQueue) Terminate()
Terminate this queue and close the queue
type PersistableChannelUniqueQueueConfiguration ¶ added in v1.12.0
type PersistableChannelUniqueQueueConfiguration struct { Name string DataDir string BatchLength int QueueLength int Timeout time.Duration MaxAttempts int Workers int MaxWorkers int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int }
PersistableChannelUniqueQueueConfiguration is the configuration for a PersistableChannelUniqueQueue
type PoolWorkers ¶
type PoolWorkers struct { PID int64 Workers int Start time.Time Timeout time.Time HasTimeout bool Cancel context.CancelFunc IsFlusher bool }
PoolWorkers represents a group of workers working on a queue
type PoolWorkersList ¶
type PoolWorkersList []*PoolWorkers
PoolWorkersList implements the sort.Interface for PoolWorkers
func (PoolWorkersList) Len ¶
func (l PoolWorkersList) Len() int
func (PoolWorkersList) Less ¶
func (l PoolWorkersList) Less(i, j int) bool
func (PoolWorkersList) Swap ¶
func (l PoolWorkersList) Swap(i, j int)
type Queue ¶
type Queue interface { Flushable Run(atShutdown, atTerminate func(context.Context, func())) Push(Data) error }
Queue defines an interface of a queue-like item
Queues will handle their own contents in the Run method
func CreateQueue ¶
func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue
CreateQueue for name with provided handler and exemplar
func NewChannelQueue ¶
func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewChannelQueue creates a memory channel queue
func NewChannelUniqueQueue ¶ added in v1.12.0
func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewChannelUniqueQueue create a memory channel queue
func NewDummyQueue ¶
func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error)
NewDummyQueue creates a new DummyQueue
func NewImmediate ¶ added in v1.13.0
func NewImmediate(handler HandlerFunc, opts, exemplar interface{}) (Queue, error)
NewImmediate creates a new false queue to execute the function when push
func NewLevelQueue ¶
func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewLevelQueue creates a ledis local queue
func NewLevelUniqueQueue ¶ added in v1.12.0
func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewLevelUniqueQueue creates a ledis local queue
Please note that this Queue does not guarantee that a particular task cannot be processed twice or more at the same time. Uniqueness is only guaranteed whilst the task is waiting in the queue.
func NewPersistableChannelQueue ¶
func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
func NewPersistableChannelUniqueQueue ¶ added in v1.12.0
func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewPersistableChannelUniqueQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
func NewQueue ¶
func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error)
NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error
func NewRedisQueue ¶
func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewRedisQueue creates single redis or cluster redis queue
func NewRedisUniqueQueue ¶ added in v1.12.0
func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewRedisUniqueQueue creates single redis or cluster redis queue.
Please note that this Queue does not guarantee that a particular task cannot be processed twice or more at the same time. Uniqueness is only guaranteed whilst the task is waiting in the queue.
func NewWrappedQueue ¶
func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewWrappedQueue will attempt to create a queue of the provided type, but if there is a problem creating this queue it will instead create a WrappedQueue with delayed startup of the queue instead and a channel which will be redirected to the queue
func NewWrappedUniqueQueue ¶ added in v1.12.0
func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewWrappedUniqueQueue will attempt to create a unique queue of the provided type, but if there is a problem creating this queue it will instead create a WrappedUniqueQueue with delayed startup of the queue instead and a channel which will be redirected to the queue
Please note that this Queue does not guarantee that a particular task cannot be processed twice or more at the same time. Uniqueness is only guaranteed whilst the task is waiting in the queue.
type RedisByteFIFO ¶ added in v1.12.0
type RedisByteFIFO struct {
// contains filtered or unexported fields
}
RedisByteFIFO represents a ByteFIFO formed from a redisClient
func NewRedisByteFIFO ¶ added in v1.12.0
func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error)
NewRedisByteFIFO creates a ByteFIFO formed from a redisClient
func (*RedisByteFIFO) Close ¶ added in v1.12.0
func (fifo *RedisByteFIFO) Close() error
Close this fifo
func (*RedisByteFIFO) Len ¶ added in v1.12.0
func (fifo *RedisByteFIFO) Len() int64
Len returns the length of the fifo
func (*RedisByteFIFO) Pop ¶ added in v1.12.0
func (fifo *RedisByteFIFO) Pop() ([]byte, error)
Pop pops data from the start of the fifo
type RedisByteFIFOConfiguration ¶ added in v1.12.0
RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO
type RedisQueueConfiguration ¶
type RedisQueueConfiguration struct { ByteFIFOQueueConfiguration RedisByteFIFOConfiguration }
RedisQueueConfiguration is the configuration for the redis queue
type RedisUniqueByteFIFO ¶ added in v1.12.0
type RedisUniqueByteFIFO struct { RedisByteFIFO // contains filtered or unexported fields }
RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient
func NewRedisUniqueByteFIFO ¶ added in v1.12.0
func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error)
NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient
func (*RedisUniqueByteFIFO) Has ¶ added in v1.12.0
func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error)
Has returns whether the fifo contains this data
func (*RedisUniqueByteFIFO) Pop ¶ added in v1.12.0
func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error)
Pop pops data from the start of the fifo
type RedisUniqueByteFIFOConfiguration ¶ added in v1.12.0
type RedisUniqueByteFIFOConfiguration struct { RedisByteFIFOConfiguration SetName string }
RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO
type RedisUniqueQueue ¶ added in v1.12.0
type RedisUniqueQueue struct {
*ByteFIFOUniqueQueue
}
RedisUniqueQueue redis queue
type RedisUniqueQueueConfiguration ¶ added in v1.12.0
type RedisUniqueQueueConfiguration struct { ByteFIFOQueueConfiguration RedisUniqueByteFIFOConfiguration }
RedisUniqueQueueConfiguration is the configuration for the redis queue
type Shutdownable ¶
type Shutdownable interface { Shutdown() Terminate() }
Shutdownable represents a queue that can be shutdown
type Type ¶
type Type string
Type is a type of Queue
const ChannelQueueType Type = "channel"
ChannelQueueType is the type for channel queue
const ChannelUniqueQueueType Type = "unique-channel"
ChannelUniqueQueueType is the type for channel queue
const DummyQueueType Type = "dummy"
DummyQueueType is the type for the dummy queue
const ImmediateType Type = "immediate"
ImmediateType is the type to execute the function when push
const LevelQueueType Type = "level"
LevelQueueType is the type for level queue
const LevelUniqueQueueType Type = "unique-level"
LevelUniqueQueueType is the type for level queue
const PersistableChannelQueueType Type = "persistable-channel"
PersistableChannelQueueType is the type for persistable queue
const PersistableChannelUniqueQueueType Type = "unique-persistable-channel"
PersistableChannelUniqueQueueType is the type for persistable queue
const RedisQueueType Type = "redis"
RedisQueueType is the type for redis queue
const RedisUniqueQueueType Type = "unique-redis"
RedisUniqueQueueType is the type for redis queue
const WrappedQueueType Type = "wrapped"
WrappedQueueType is the type for a wrapped delayed starting queue
const WrappedUniqueQueueType Type = "unique-wrapped"
WrappedUniqueQueueType is the type for a wrapped delayed starting queue
func RegisteredTypes ¶
func RegisteredTypes() []Type
RegisteredTypes provides the list of requested types of queues
type UniqueByteFIFO ¶ added in v1.12.0
type UniqueByteFIFO interface { ByteFIFO // Has returns whether the fifo contains this data Has(data []byte) (bool, error) }
UniqueByteFIFO defines a FIFO that Uniques its contents
type UniqueQueue ¶ added in v1.12.0
UniqueQueue defines a queue which guarantees only one instance of same data is in the queue. Instances with same identity will be discarded if there is already one in the line.
This queue is particularly useful for preventing duplicated task of same purpose - please note that this does not guarantee that a particular task cannot be processed twice or more at the same time. Uniqueness is only guaranteed whilst the task is waiting in the queue.
Users of this queue should be careful to push only the identifier of the data
func CreateUniqueQueue ¶ added in v1.12.0
func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue
CreateUniqueQueue for name with provided handler and exemplar
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool represent a dynamically growable worker pool for a provided handler function. They have an internal channel which they use to detect if there is a block and will grow and shrink in response to demand as per configuration.
func NewWorkerPool ¶ added in v1.12.0
func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool
NewWorkerPool creates a new worker pool
func (*WorkerPool) AddWorkers ¶
func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc
AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
func (*WorkerPool) BlockTimeout ¶
func (p *WorkerPool) BlockTimeout() time.Duration
BlockTimeout returns the timeout til the next boost
func (*WorkerPool) BoostTimeout ¶
func (p *WorkerPool) BoostTimeout() time.Duration
BoostTimeout returns the timeout of the next boost
func (*WorkerPool) BoostWorkers ¶
func (p *WorkerPool) BoostWorkers() int
BoostWorkers returns the number of workers for a boost
func (*WorkerPool) CleanUp ¶
func (p *WorkerPool) CleanUp(ctx context.Context)
CleanUp will drain the remaining contents of the channel This should be called after AddWorkers context is closed
func (*WorkerPool) Flush ¶ added in v1.12.0
func (p *WorkerPool) Flush(timeout time.Duration) error
Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
func (*WorkerPool) FlushWithContext ¶ added in v1.12.0
func (p *WorkerPool) FlushWithContext(ctx context.Context) error
FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty NB: The worker will not be registered with the manager.
func (*WorkerPool) IsEmpty ¶ added in v1.12.0
func (p *WorkerPool) IsEmpty() bool
IsEmpty returns if true if the worker queue is empty
func (*WorkerPool) MaxNumberOfWorkers ¶
func (p *WorkerPool) MaxNumberOfWorkers() int
MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool
func (*WorkerPool) NumberOfWorkers ¶
func (p *WorkerPool) NumberOfWorkers() int
NumberOfWorkers returns the number of current workers in the pool
func (*WorkerPool) Push ¶
func (p *WorkerPool) Push(data Data)
Push pushes the data to the internal channel
func (*WorkerPool) SetMaxNumberOfWorkers ¶
func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int)
SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool Changing this number will not change the number of current workers but will change the limit for future additions
func (*WorkerPool) SetPoolSettings ¶ added in v1.12.0
func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
SetPoolSettings sets the setable boost values
type WorkerPoolConfiguration ¶ added in v1.12.0
type WorkerPoolConfiguration struct { QueueLength int BatchLength int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int MaxWorkers int }
WorkerPoolConfiguration is the basic configuration for a WorkerPool
type WrappedQueue ¶
type WrappedQueue struct {
// contains filtered or unexported fields
}
WrappedQueue wraps a delayed starting queue
func (*WrappedQueue) Flush ¶ added in v1.12.0
func (q *WrappedQueue) Flush(timeout time.Duration) error
Flush flushes the queue and blocks till the queue is empty
func (*WrappedQueue) FlushWithContext ¶ added in v1.12.0
func (q *WrappedQueue) FlushWithContext(ctx context.Context) error
FlushWithContext implements the final part of Flushable
func (*WrappedQueue) IsEmpty ¶ added in v1.12.0
func (q *WrappedQueue) IsEmpty() bool
IsEmpty checks whether the queue is empty
func (*WrappedQueue) Push ¶
func (q *WrappedQueue) Push(data Data) error
Push will push the data to the internal channel checking it against the exemplar
func (*WrappedQueue) Run ¶
func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue and attempts to create the internal queue
func (*WrappedQueue) Shutdown ¶
func (q *WrappedQueue) Shutdown()
Shutdown this queue and stop processing
func (*WrappedQueue) Terminate ¶
func (q *WrappedQueue) Terminate()
Terminate this queue and close the queue
type WrappedQueueConfiguration ¶
type WrappedQueueConfiguration struct { Underlying Type Timeout time.Duration MaxAttempts int Config interface{} QueueLength int Name string }
WrappedQueueConfiguration is the configuration for a WrappedQueue
type WrappedUniqueQueue ¶ added in v1.12.0
type WrappedUniqueQueue struct { *WrappedQueue // contains filtered or unexported fields }
WrappedUniqueQueue wraps a delayed starting unique queue
func (*WrappedUniqueQueue) Has ¶ added in v1.12.0
func (q *WrappedUniqueQueue) Has(data Data) (bool, error)
Has checks if the data is in the queue
func (*WrappedUniqueQueue) IsEmpty ¶ added in v1.12.0
func (q *WrappedUniqueQueue) IsEmpty() bool
IsEmpty checks whether the queue is empty
func (*WrappedUniqueQueue) Push ¶ added in v1.12.0
func (q *WrappedUniqueQueue) Push(data Data) error
Push will push the data to the internal channel checking it against the exemplar
Source Files ¶
- bytefifo.go
- helper.go
- manager.go
- queue.go
- queue_bytefifo.go
- queue_channel.go
- queue_disk.go
- queue_disk_channel.go
- queue_redis.go
- queue_wrapped.go
- setting.go
- unique_queue.go
- unique_queue_channel.go
- unique_queue_disk.go
- unique_queue_disk_channel.go
- unique_queue_redis.go
- unique_queue_wrapped.go
- workerpool.go