queue

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2023 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// SeqNoNewMessageAvailable is the seqNum returned when no new message available
	SeqNoNewMessageAvailable = int64(-1)
)

Variables

View Source
var (
	// ErrExceedingMessageSizeLimit returns when appending message exceeds the max size limit.
	ErrExceedingMessageSizeLimit = errors.New("message exceeds the max page size limit")
	// ErrOutOfSequenceRange returns sequence out of range.
	ErrOutOfSequenceRange = errors.New("out of sequence range")
	// ErrExceedingTotalSizeLimit returns total size limit.
	ErrExceedingTotalSizeLimit = errors.New("queue data size exceeds the max size limit")
	// ErrMsgNotFound returns message not found.
	ErrMsgNotFound = errors.New("message not found")
)

Functions

This section is empty.

Types

type ConsumerGroup

type ConsumerGroup interface {
	// Name returns a unique name for ConsumerGroup in a FanOutQueue.
	Name() string
	// Consume returns the seq for the next data to consume.
	// If no new data is available, SeqNoNewMessageAvailable is returned.
	Consume() int64
	// SetConsumedSeq sets the consumed sequence, this is useful when re-consume message.
	// error returns when seq is invalid(less than acknowledged seq or greater than the read barrier).
	SetConsumedSeq(seq int64)
	// Ack mark the data processed with sequence less than or equals to acknowledged sequence.
	Ack(ackSeq int64)
	// ConsumedSeq returns the sequence of consumed.
	ConsumedSeq() int64
	// AcknowledgedSeq returns the acknowledged sequence.
	AcknowledgedSeq() int64
	// Queue returns underlying queue.
	Queue() FanOutQueue
	// Pause pauses consume data.
	Pause()
	// SetSeq sets consumed/acknowledged sequence.
	SetSeq(seq int64)
	// Pending returns the offset between ConsumerGroup consumed sequence and FanOutQueue appended sequence.
	Pending() int64
	// IsEmpty returns if fan out consumer cannot consume any data.
	IsEmpty() bool
	// Close persists  headSeq, tailSeq.
	Close()
	// contains filtered or unexported methods
}

ConsumerGroup represents an individual consumer with own consume and ack sequence. The typical way to use ConsumerGroup is using a single go-routine to consume message, and using other go-routine to ack the messages which have been processed successfully.

func NewConsumerGroup

func NewConsumerGroup(parent, fanOutPath string, q FanOutQueue) (ConsumerGroup, error)

NewConsumerGroup builds a ConsumerGroup from metaPath.

type FanOutQueue

type FanOutQueue interface {
	// Path returns path for persistence files.
	Path() string
	// Queue returns underlying queue.
	Queue() Queue
	// GetOrCreateConsumerGroup returns the ConsumerGroup if exists,
	// otherwise creates a new ConsumerGroup with consume seq and ack seq == queue ack seq.
	GetOrCreateConsumerGroup(name string) (ConsumerGroup, error)
	// ConsumerGroupNames returns all names of ConsumerGroup.
	ConsumerGroupNames() []string
	// StopConsumerGroup stops consumer group by name.
	StopConsumerGroup(name string)
	// Sync checks the acknowledged sequence of each ConsumerGroup, update the acknowledged sequence as the smallest one.
	// Then syncs metadata to storage.
	Sync()
	// SetAppendedSeq sets appended sequence underlying queue, then set consumed/acknowledged sequence for each ConsumerGroup.
	SetAppendedSeq(seq int64)
	// Close persists Seq meta, ConsumerGroup seq meta, release resources.
	Close()
}

FanOutQueue represents a queue "produce once, consume multiple times". ConsumerGroup represents an individual consumer with own consume/acknowledge sequence.

func NewFanOutQueue

func NewFanOutQueue(dirPath string, dataSizeLimit int64) (q FanOutQueue, err error)

NewFanOutQueue returns a FanOutQueue persisted in dirPath.

type Queue

type Queue interface {
	// Put puts data to the end of the queue, if puts failure return err.
	Put(message []byte) error
	// Get gets the message data at specific index.
	Get(sequence int64) (message []byte, err error)
	// AppendedSeq returns the written sequence which stands for the latest write barrier.
	// New message is appended at append sequence.
	AppendedSeq() int64
	// SetAppendedSeq sets appended sequence.
	SetAppendedSeq(seq int64)
	// AcknowledgedSeq returns the acknowledged sequence which stands for the oldest read barrier.
	// Message with req less than acknowledged sequence would be deleted at some point.
	AcknowledgedSeq() int64
	// SetAcknowledgedSeq sets acknowledged sequence.
	SetAcknowledgedSeq(seq int64)
	// NotEmpty checks queue if empty, waiting until new data written.
	NotEmpty(consumeHead int64, checkClosed func() bool) bool
	// Signal signals waiting consumers.
	Signal()
	// GC removes all message which sequence <= acknowledged sequence.
	GC()
	// Close closes the queue.
	Close()
}

Queue represents a sequence of segments, new data is appended at append sequence. Segments with all message will be removed by gc which sequence < acknowledged sequence.

func NewQueue

func NewQueue(dirPath string, dataSizeLimit int64) (Queue, error)

NewQueue returns Queue based on dirPath, dataSizeLimit is used to limit the total data/index size,

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL