Documentation ¶
Index ¶
- func IsErrInvalidConfiguration(err error) bool
- func RegisteredTypesAsString() []string
- type ChannelQueue
- type ChannelQueueConfiguration
- type Data
- type DummyQueue
- type ErrInvalidConfiguration
- type HandlerFunc
- type LevelQueue
- type LevelQueueConfiguration
- type ManagedPool
- type ManagedQueue
- func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc
- func (q *ManagedQueue) BlockTimeout() time.Duration
- func (q *ManagedQueue) BoostTimeout() time.Duration
- func (q *ManagedQueue) BoostWorkers() int
- func (q *ManagedQueue) CancelWorkers(pid int64)
- func (q *ManagedQueue) MaxNumberOfWorkers() int
- func (q *ManagedQueue) NumberOfWorkers() int
- func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, ...) int64
- func (q *ManagedQueue) RemoveWorkers(pid int64)
- func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
- func (q *ManagedQueue) Workers() []*PoolWorkers
- type ManagedQueueList
- type Manager
- type Named
- type NewQueueFunc
- type PersistableChannelQueue
- type PersistableChannelQueueConfiguration
- type PoolWorkers
- type PoolWorkersList
- type Queue
- func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue
- func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error)
- func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error)
- func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
- type RedisQueue
- type RedisQueueConfiguration
- type Shutdownable
- type Type
- type WorkerPool
- func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc
- func (p *WorkerPool) BlockTimeout() time.Duration
- func (p *WorkerPool) BoostTimeout() time.Duration
- func (p *WorkerPool) BoostWorkers() int
- func (p *WorkerPool) CleanUp(ctx context.Context)
- func (p *WorkerPool) MaxNumberOfWorkers() int
- func (p *WorkerPool) NumberOfWorkers() int
- func (p *WorkerPool) Push(data Data)
- func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int)
- func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
- func (p *WorkerPool) Wait()
- type WrappedQueue
- type WrappedQueueConfiguration
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsErrInvalidConfiguration ¶
IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration
func RegisteredTypesAsString ¶
func RegisteredTypesAsString() []string
RegisteredTypesAsString provides the list of requested types of queues
Types ¶
type ChannelQueue ¶
type ChannelQueue struct {
// contains filtered or unexported fields
}
ChannelQueue implements
func (*ChannelQueue) Name ¶
func (c *ChannelQueue) Name() string
Name returns the name of this queue
func (*ChannelQueue) Push ¶
func (c *ChannelQueue) Push(data Data) error
Push will push data into the queue
func (*ChannelQueue) Run ¶
func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue
type ChannelQueueConfiguration ¶
type ChannelQueueConfiguration struct { QueueLength int BatchLength int Workers int MaxWorkers int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int Name string }
ChannelQueueConfiguration is the configuration for a ChannelQueue
type DummyQueue ¶
type DummyQueue struct { }
DummyQueue represents an empty queue
func (*DummyQueue) Run ¶
func (b *DummyQueue) Run(_, _ func(context.Context, func()))
Run starts to run the queue
type ErrInvalidConfiguration ¶
type ErrInvalidConfiguration struct {
// contains filtered or unexported fields
}
ErrInvalidConfiguration is called when there is invalid configuration for a queue
func (ErrInvalidConfiguration) Error ¶
func (err ErrInvalidConfiguration) Error() string
type HandlerFunc ¶
type HandlerFunc func(...Data)
HandlerFunc is a function that takes a variable amount of data and processes it
type LevelQueue ¶
type LevelQueue struct {
// contains filtered or unexported fields
}
LevelQueue implements a disk library queue
func (*LevelQueue) Push ¶
func (l *LevelQueue) Push(data Data) error
Push will push the indexer data to queue
func (*LevelQueue) Run ¶
func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue
func (*LevelQueue) Shutdown ¶
func (l *LevelQueue) Shutdown()
Shutdown this queue and stop processing
func (*LevelQueue) Terminate ¶
func (l *LevelQueue) Terminate()
Terminate this queue and close the queue
type LevelQueueConfiguration ¶
type LevelQueueConfiguration struct { DataDir string QueueLength int BatchLength int Workers int MaxWorkers int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int Name string }
LevelQueueConfiguration is the configuration for a LevelQueue
type ManagedPool ¶
type ManagedPool interface { AddWorkers(number int, timeout time.Duration) context.CancelFunc NumberOfWorkers() int MaxNumberOfWorkers() int SetMaxNumberOfWorkers(int) BoostTimeout() time.Duration BlockTimeout() time.Duration BoostWorkers() int SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) }
ManagedPool is a simple interface to get certain details from a worker pool
type ManagedQueue ¶
type ManagedQueue struct { QID int64 Queue Queue Type Type Name string Configuration interface{} ExemplarType string Pool ManagedPool PoolWorkers map[int64]*PoolWorkers // contains filtered or unexported fields }
ManagedQueue represents a working queue inheriting from Gitea.
func (*ManagedQueue) AddWorkers ¶
func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc
AddWorkers adds workers to the queue if it has registered an add worker function
func (*ManagedQueue) BlockTimeout ¶
func (q *ManagedQueue) BlockTimeout() time.Duration
BlockTimeout returns the timeout til the next boost
func (*ManagedQueue) BoostTimeout ¶
func (q *ManagedQueue) BoostTimeout() time.Duration
BoostTimeout returns the timeout of the next boost
func (*ManagedQueue) BoostWorkers ¶
func (q *ManagedQueue) BoostWorkers() int
BoostWorkers returns the number of workers for a boost
func (*ManagedQueue) CancelWorkers ¶
func (q *ManagedQueue) CancelWorkers(pid int64)
CancelWorkers cancels pooled workers with pid
func (*ManagedQueue) MaxNumberOfWorkers ¶
func (q *ManagedQueue) MaxNumberOfWorkers() int
MaxNumberOfWorkers returns the maximum number of workers for the pool
func (*ManagedQueue) NumberOfWorkers ¶
func (q *ManagedQueue) NumberOfWorkers() int
NumberOfWorkers returns the number of workers in the queue
func (*ManagedQueue) RegisterWorkers ¶
func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64
RegisterWorkers registers workers to this queue
func (*ManagedQueue) RemoveWorkers ¶
func (q *ManagedQueue) RemoveWorkers(pid int64)
RemoveWorkers deletes pooled workers with pid
func (*ManagedQueue) SetSettings ¶
func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
SetSettings sets the setable boost values
func (*ManagedQueue) Workers ¶
func (q *ManagedQueue) Workers() []*PoolWorkers
Workers returns the poolworkers
type ManagedQueueList ¶
type ManagedQueueList []*ManagedQueue
ManagedQueueList implements the sort.Interface
func (ManagedQueueList) Len ¶
func (l ManagedQueueList) Len() int
func (ManagedQueueList) Less ¶
func (l ManagedQueueList) Less(i, j int) bool
func (ManagedQueueList) Swap ¶
func (l ManagedQueueList) Swap(i, j int)
type Manager ¶
type Manager struct { Queues map[int64]*ManagedQueue // contains filtered or unexported fields }
Manager is a queue manager
func GetManager ¶
func GetManager() *Manager
GetManager returns a Manager and initializes one as singleton if there's none yet
func (*Manager) Add ¶
func (m *Manager) Add(queue Queue, t Type, configuration, exemplar interface{}, pool ManagedPool) int64
Add adds a queue to this manager
func (*Manager) GetManagedQueue ¶
func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue
GetManagedQueue by qid
func (*Manager) ManagedQueues ¶
func (m *Manager) ManagedQueues() []*ManagedQueue
ManagedQueues returns the managed queues
type NewQueueFunc ¶
type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error)
NewQueueFunc is a function that creates a queue
type PersistableChannelQueue ¶
type PersistableChannelQueue struct { *ChannelQueue // contains filtered or unexported fields }
PersistableChannelQueue wraps a channel queue and level queue together
func (*PersistableChannelQueue) Name ¶
func (p *PersistableChannelQueue) Name() string
Name returns the name of this queue
func (*PersistableChannelQueue) Push ¶
func (p *PersistableChannelQueue) Push(data Data) error
Push will push the indexer data to queue
func (*PersistableChannelQueue) Run ¶
func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue
func (*PersistableChannelQueue) Shutdown ¶
func (p *PersistableChannelQueue) Shutdown()
Shutdown processing this queue
func (*PersistableChannelQueue) Terminate ¶
func (p *PersistableChannelQueue) Terminate()
Terminate this queue and close the queue
type PersistableChannelQueueConfiguration ¶
type PersistableChannelQueueConfiguration struct { Name string DataDir string BatchLength int QueueLength int Timeout time.Duration MaxAttempts int Workers int MaxWorkers int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int }
PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue
type PoolWorkers ¶
type PoolWorkers struct { PID int64 Workers int Start time.Time Timeout time.Time HasTimeout bool Cancel context.CancelFunc }
PoolWorkers represents a working queue inheriting from Gitea.
type PoolWorkersList ¶
type PoolWorkersList []*PoolWorkers
PoolWorkersList implements the sort.Interface
func (PoolWorkersList) Len ¶
func (l PoolWorkersList) Len() int
func (PoolWorkersList) Less ¶
func (l PoolWorkersList) Less(i, j int) bool
func (PoolWorkersList) Swap ¶
func (l PoolWorkersList) Swap(i, j int)
type Queue ¶
type Queue interface { Run(atShutdown, atTerminate func(context.Context, func())) Push(Data) error }
Queue defines an interface to save an issue indexer queue
func CreateQueue ¶
func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue
CreateQueue for name with provided handler and exemplar
func NewChannelQueue ¶
func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewChannelQueue create a memory channel queue
func NewDummyQueue ¶
func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error)
NewDummyQueue creates a new DummyQueue
func NewLevelQueue ¶
func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewLevelQueue creates a ledis local queue
func NewPersistableChannelQueue ¶
func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
func NewQueue ¶
func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error)
NewQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error
func NewRedisQueue ¶
func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewRedisQueue creates single redis or cluster redis queue
func NewWrappedQueue ¶
func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
NewWrappedQueue will attempt to create a queue of the provided type, but if there is a problem creating this queue it will instead create a WrappedQueue with delayed startup of the queue instead and a channel which will be redirected to the queue
type RedisQueue ¶
type RedisQueue struct {
// contains filtered or unexported fields
}
RedisQueue redis queue
func (*RedisQueue) Run ¶
func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run runs the redis queue
func (*RedisQueue) Terminate ¶
func (r *RedisQueue) Terminate()
Terminate this queue and close the queue
type RedisQueueConfiguration ¶
type RedisQueueConfiguration struct { Network string Addresses string Password string DBIndex int BatchLength int QueueLength int QueueName string Workers int MaxWorkers int BlockTimeout time.Duration BoostTimeout time.Duration BoostWorkers int Name string }
RedisQueueConfiguration is the configuration for the redis queue
type Shutdownable ¶
type Shutdownable interface { Shutdown() Terminate() }
Shutdownable represents a queue that can be shutdown
type Type ¶
type Type string
Type is a type of Queue
const ChannelQueueType Type = "channel"
ChannelQueueType is the type for channel queue
const DummyQueueType Type = "dummy"
DummyQueueType is the type for the dummy queue
const LevelQueueType Type = "level"
LevelQueueType is the type for level queue
const PersistableChannelQueueType Type = "persistable-channel"
PersistableChannelQueueType is the type for persistable queue
const RedisQueueType Type = "redis"
RedisQueueType is the type for redis queue
const WrappedQueueType Type = "wrapped"
WrappedQueueType is the type for a wrapped delayed starting queue
func RegisteredTypes ¶
func RegisteredTypes() []Type
RegisteredTypes provides the list of requested types of queues
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool takes
func (*WorkerPool) AddWorkers ¶
func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc
AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
func (*WorkerPool) BlockTimeout ¶
func (p *WorkerPool) BlockTimeout() time.Duration
BlockTimeout returns the timeout til the next boost
func (*WorkerPool) BoostTimeout ¶
func (p *WorkerPool) BoostTimeout() time.Duration
BoostTimeout returns the timeout of the next boost
func (*WorkerPool) BoostWorkers ¶
func (p *WorkerPool) BoostWorkers() int
BoostWorkers returns the number of workers for a boost
func (*WorkerPool) CleanUp ¶
func (p *WorkerPool) CleanUp(ctx context.Context)
CleanUp will drain the remaining contents of the channel This should be called after AddWorkers context is closed
func (*WorkerPool) MaxNumberOfWorkers ¶
func (p *WorkerPool) MaxNumberOfWorkers() int
MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool
func (*WorkerPool) NumberOfWorkers ¶
func (p *WorkerPool) NumberOfWorkers() int
NumberOfWorkers returns the number of current workers in the pool
func (*WorkerPool) Push ¶
func (p *WorkerPool) Push(data Data)
Push pushes the data to the internal channel
func (*WorkerPool) SetMaxNumberOfWorkers ¶
func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int)
SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool Changing this number will not change the number of current workers but will change the limit for future additions
func (*WorkerPool) SetSettings ¶
func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
SetSettings sets the setable boost values
type WrappedQueue ¶
type WrappedQueue struct {
// contains filtered or unexported fields
}
WrappedQueue wraps a delayed starting queue
func (*WrappedQueue) Push ¶
func (q *WrappedQueue) Push(data Data) error
Push will push the data to the internal channel checking it against the exemplar
func (*WrappedQueue) Run ¶
func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func()))
Run starts to run the queue and attempts to create the internal queue
func (*WrappedQueue) Shutdown ¶
func (q *WrappedQueue) Shutdown()
Shutdown this queue and stop processing
func (*WrappedQueue) Terminate ¶
func (q *WrappedQueue) Terminate()
Terminate this queue and close the queue