Documentation ¶
Index ¶
- type Config
- type OperationMessage
- type Queue
- func (q *Queue) Add(op *operation.QueuedOperation, protocolVersion uint64) (uint, error)
- func (q *Queue) Len() uint
- func (q *Queue) Peek(num uint) (operation.QueuedOperationsAtTime, error)
- func (q *Queue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(error), ...)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // PoolSize is the number of AMQP subscribers that are listening for operation messages. PoolSize int // TaskMonitorInterval is the interval (period) in which operation queue tasks from other server instances // are monitored. TaskMonitorInterval time.Duration // TaskExpiration is the maximum time that an operation queue task can exist in the database before it is // considered to have expired. At which point, any other server instance may delete the task and take over // processing of all operations associated with the task. TaskExpiration time.Duration // MaxRetries is the maximum number of retries for a failed operation in a batch. MaxRetries int // RetriesInitialDelay is the delay for the initial retry attempt. RetriesInitialDelay time.Duration // RetriesMaxDelay is the maximum delay for a retry attempt. RetriesMaxDelay time.Duration // RetriesMultiplier is the multiplier for a retry attempt. For example, if set to 1.5 and // the previous retry interval was 2s then the next retry interval is set 3s. RetriesMultiplier float64 // MaxOperationsToRepost is the maximum number of operations to repost to the queue after // an instance dies. MaxOperationsToRepost int // OperationLifespan is the maximum time that an operation can exist in the database before // it is deleted OperationLifeSpan time.Duration // BatchWriterTimeout specifies the timeout for when a batch of operations is cut. BatchWriterTimeout time.Duration // MaxContiguousWithError specifies the maximum number of previously failed operations to rearrange contiguously. MaxContiguousWithError int // MaxContiguousWithoutError specifies the maximum number of operations (with no error) to rearrange contiguously. MaxContiguousWithoutError int }
Config contains configuration parameters for the operation queue.
type OperationMessage ¶ added in v1.0.0
type OperationMessage struct { ID string `json:"id"` Operation *operation.QueuedOperationAtTime `json:"operation"` Retries int `json:"retries"` HasError bool `json:"hasError,omitempty"` }
OperationMessage contains the data that is sent to the message broker.
type Queue ¶
Queue implements an operation queue that uses a publisher/subscriber.
func New ¶
func New(cfg *Config, pubSub pubSub, p storage.Provider, taskMgr taskManager, expiryService dataExpiryService, metrics metricsProvider, ) (*Queue, error)
New returns a new operation queue.
func (*Queue) Peek ¶
func (q *Queue) Peek(num uint) (operation.QueuedOperationsAtTime, error)
Peek returns (up to) the given number of operations from the head of the queue but does not remove them.
func (*Queue) Remove ¶
func (q *Queue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(error), err error)
Remove removes (up to) the given number of items from the head of the queue. Returns the actual number of items that were removed and the new length of the queue.
Click to show internal directories.
Click to hide internal directories.