Documentation ¶
Overview ¶
Package lasr implements a persistent message queue backed by BoltDB. This queue is useful when the producers and consumers can live in the same process.
lasr is designed to never lose information. When the Send method completes, messages have been safely written to disk. On Receive, messages are not deleted until Ack is called. Users should make sure they always respond to messages with Ack or Nack.
Dead-lettering is supported, but disabled by default.
Index ¶
- Variables
- type ID
- type Message
- type Option
- type Q
- func (q *Q) Close() error
- func (q *Q) Compact() (rerr error)
- func (q *Q) Delay(message []byte, when time.Time) (ID, error)
- func (q *Q) Receive(ctx context.Context) (*Message, error)
- func (q *Q) Send(message []byte) (ID, error)
- func (q *Q) String() string
- func (q *Q) Wait(msg []byte, on ...ID) (ID, error)
- type Sequencer
- type Uint64ID
Constants ¶
This section is empty.
Variables ¶
var ( // ErrAckNack is returned by Ack and Nack when of them has been called already. ErrAckNack = errors.New("lasr: Ack or Nack already called") // ErrQClosed is returned by Send, Receive and Close when the Q has already // been closed. ErrQClosed = errors.New("lasr: Q is closed") // ErrOptionsApplied is called when an Option is applied to a Q after NewQ // has already returned. ErrOptionsApplied = errors.New("lasr: options cannot be applied after New") )
var ( // MaxDelayTime is the maximum time that can be passed to Q.Delay(). MaxDelayTime = time.Unix(0, 1<<63-1) )
Functions ¶
This section is empty.
Types ¶
type ID ¶
type ID interface { encoding.BinaryMarshaler }
ID is used for uniquely identifying messages in a Q.
type Message ¶
Message is a messaged returned from Q on Receive.
Message contains a Body and an ID. The ID will be equal to the ID that was returned on Send, Delay or Wait for this message.
type Option ¶
Options can be passed to NewQ.
func WithDeadLetters ¶
func WithDeadLetters() Option
WithDeadLetters will cause nacked messages that are not retried to be added to a dead letters queue.
func WithMessageBufferSize ¶
WithMessageBufferSize sets the message buffer size. By default, the message buffer size is 0. Values less than 0 are not allowed.
The buffer is used by Receive to efficiently ready messages for consumption. If the buffer is greater than 0, then multiple messages can retrieved in a single transaction.
Buffered messages come with a caveat: messages will move into the "unacked" state before Receive is called.
Buffered messages come at the cost of increased memory use. If messages are large in size, use this cautiously.
func WithSequencer ¶
WithSequencer will cause a Q to use a user-provided Sequencer.
type Q ¶
type Q struct {
// contains filtered or unexported fields
}
Q is a persistent message queue. Its methods are goroutine-safe. Q retains the data that is sent to it until messages are acked (or nacked without retry)
func DeadLetters ¶
If dead-lettering is enabled on q, DeadLetters will return a dead-letter queue that is named the same as q, but will emit dead-letters on Receive. The dead-letter queue itself does not support dead-lettering; nacked messages that are not retried will be deleted.
If dead-lettering is not enabled on q, an error will be returned.
func NewQ ¶
NewQ creates a new Q. Only one queue should be created in a given bolt db, unless compaction is disabled.
func (*Q) Close ¶
Close closes q. When q is closed, Send, Receive, and Close will return ErrQClosed. Close blocks until all messages in the "unacked" state are Acked or Nacked.
func (*Q) Compact ¶
Compact performs compaction on the queue. All messages will be copied from the underlying database into another.
The queue is unavailable while compaction is occurring.
Compaction causes the underlying bolt database to be replaced, so callers should be aware that any other queues relying on the database may be invalidated.
func (*Q) Delay ¶
Delay is like Send, but the message will not enter the Ready state until after "when" has occurred.
If "when" has already occurred, then it will be set to time.Now().
func (*Q) Receive ¶
Receive receives a message from the queue. If no messages are available by the time the context is done, then the function will return a nil Message and the result of ctx.Err().
func (*Q) Send ¶
Send sends a message to Q. When send completes with nil error, the message sent to Q will be in the Ready state.
type Sequencer ¶
Sequencer returns an ID with each call to NextSequence and any error that occurred.
A Sequencer should obey the following invariants:
* NextSequence is goroutine-safe.
* NextSequence will never generate the same ID.
* NextSequence will return IDs whose big-endian binary representation is incrementing.
Q is not guaranteed to use all of the IDs generated by its Sequencer.