opqueue

package
v1.0.0-rc.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2022 License: Apache-2.0 Imports: 14 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// PoolSize is the number of AMQP subscribers that are listening for operation messages.
	PoolSize int
	// TaskMonitorInterval is the interval (period) in which operation queue tasks from other server instances
	// are monitored.
	TaskMonitorInterval time.Duration
	// TaskExpiration is the maximum time that an operation queue task can exist in the database before it is
	// considered to have expired. At which point, any other server instance may delete the task and take over
	// processing of all operations associated with the task.
	TaskExpiration time.Duration
	// OpExpiration is the age that a pending operation in the database is considered to have expired, after which the
	// operation is deleted. Operations should be deleted during normal processing but, in case it isn't deleted,
	// the data expiry service will delete the operations that are older than this value.
	OpExpiration time.Duration
	// MaxRetries is the maximum number of retries for a failed operation in a batch.
	MaxRetries int
}

Config contains configuration parameters for the operation queue.

type Queue

type Queue struct {
	*lifecycle.Lifecycle
	// contains filtered or unexported fields
}

Queue implements an operation queue that uses a publisher/subscriber.

func New

func New(cfg Config, pubSub pubSub, p storage.Provider, taskMgr taskManager,
	expiryService dataExpiryService, metrics metricsProvider) (*Queue, error)

New returns a new operation queue.

func (*Queue) Add

func (q *Queue) Add(op *operation.QueuedOperation, protocolVersion uint64) (uint, error)

Add publishes the given operation.

func (*Queue) Len

func (q *Queue) Len() uint

Len returns the length of the pending queue.

func (*Queue) Peek

Peek returns (up to) the given number of operations from the head of the queue but does not remove them.

func (*Queue) Remove

func (q *Queue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(), err error)

Remove removes (up to) the given number of items from the head of the queue. Returns the actual number of items that were removed and the new length of the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL