Documentation ¶
Index ¶
Constants ¶
const ( // SeqNoNewMessageAvailable is the seqNum returned when no new message available SeqNoNewMessageAvailable = int64(-1) )
Variables ¶
var ( // ErrExceedingMessageSizeLimit returns when appending message exceeds the max size limit. ErrExceedingMessageSizeLimit = errors.New("message exceeds the max page size limit") // ErrOutOfSequenceRange returns sequence out of range. ErrOutOfSequenceRange = errors.New("out of sequence range") // ErrExceedingTotalSizeLimit returns total size limit. ErrExceedingTotalSizeLimit = errors.New("queue data size exceeds the max size limit") // ErrMsgNotFound returns message not found. ErrMsgNotFound = errors.New("message not found") )
Functions ¶
This section is empty.
Types ¶
type ConsumerGroup ¶
type ConsumerGroup interface { // Name returns a unique name for ConsumerGroup in a FanOutQueue. Name() string // Consume returns the seq for the next data to consume. // If no new data is available, SeqNoNewMessageAvailable is returned. Consume() int64 // SetConsumedSeq sets the consumed sequence, this is useful when re-consume message. // error returns when seq is invalid(less than acknowledged seq or greater than the read barrier). SetConsumedSeq(seq int64) // Ack mark the data processed with sequence less than or equals to acknowledged sequence. Ack(ackSeq int64) // ConsumedSeq returns the sequence of consumed. ConsumedSeq() int64 // AcknowledgedSeq returns the acknowledged sequence. AcknowledgedSeq() int64 // Queue returns underlying queue. Queue() FanOutQueue // Pause pauses consume data. Pause() // SetSeq sets consumed/acknowledged sequence. SetSeq(seq int64) // Pending returns the offset between ConsumerGroup consumed sequence and FanOutQueue appended sequence. Pending() int64 // IsEmpty returns if fan out consumer cannot consume any data. IsEmpty() bool // Close persists headSeq, tailSeq. Close() // contains filtered or unexported methods }
ConsumerGroup represents an individual consumer with own consume and ack sequence. The typical way to use ConsumerGroup is using a single go-routine to consume message, and using other go-routine to ack the messages which have been processed successfully.
func NewConsumerGroup ¶
func NewConsumerGroup(parent, fanOutPath string, q FanOutQueue) (ConsumerGroup, error)
NewConsumerGroup builds a ConsumerGroup from metaPath.
type FanOutQueue ¶
type FanOutQueue interface { // Path returns path for persistence files. Path() string // Queue returns underlying queue. Queue() Queue // GetOrCreateConsumerGroup returns the ConsumerGroup if exists, // otherwise creates a new ConsumerGroup with consume seq and ack seq == queue ack seq. GetOrCreateConsumerGroup(name string) (ConsumerGroup, error) // ConsumerGroupNames returns all names of ConsumerGroup. ConsumerGroupNames() []string // StopConsumerGroup stops consumer group by name. StopConsumerGroup(name string) // Sync checks the acknowledged sequence of each ConsumerGroup, update the acknowledged sequence as the smallest one. // Then syncs metadata to storage. Sync() // SetAppendedSeq sets appended sequence underlying queue, then set consumed/acknowledged sequence for each ConsumerGroup. SetAppendedSeq(seq int64) // Close persists Seq meta, ConsumerGroup seq meta, release resources. Close() }
FanOutQueue represents a queue "produce once, consume multiple times". ConsumerGroup represents an individual consumer with own consume/acknowledge sequence.
func NewFanOutQueue ¶
func NewFanOutQueue(dirPath string, dataSizeLimit int64) (q FanOutQueue, err error)
NewFanOutQueue returns a FanOutQueue persisted in dirPath.
type Queue ¶
type Queue interface { // Put puts data to the end of the queue, if puts failure return err. Put(message []byte) error // Get gets the message data at specific index. Get(sequence int64) (message []byte, err error) // AppendedSeq returns the written sequence which stands for the latest write barrier. // New message is appended at append sequence. AppendedSeq() int64 // SetAppendedSeq sets appended sequence. SetAppendedSeq(seq int64) // AcknowledgedSeq returns the acknowledged sequence which stands for the oldest read barrier. // Message with req less than acknowledged sequence would be deleted at some point. AcknowledgedSeq() int64 // SetAcknowledgedSeq sets acknowledged sequence. SetAcknowledgedSeq(seq int64) // NotEmpty checks queue if empty, waiting until new data written. NotEmpty(consumeHead int64, checkClosed func() bool) bool // Signal signals waiting consumers. Signal() // GC removes all message which sequence <= acknowledged sequence. GC() // Close closes the queue. Close() }
Queue represents a sequence of segments, new data is appended at append sequence. Segments with all message will be removed by gc which sequence < acknowledged sequence.