Documentation ¶
Overview ¶
Package buffer implements a batching buffer with batch lease and retry management.
It is meant to be used by something which handles synchronization (primarially the "common/sync/dispatcher.Channel"). As such, it IS NOT GOROUTINE-SAFE. If you use this outside of dispatcher.Channel, you must synchronize all access to the methods of Buffer.
Index ¶
- Variables
- type Batch
- type BatchItem
- type BlockNewItems
- type Buffer
- func (buf *Buffer[T]) ACK(leased *Batch[T])
- func (buf *Buffer[T]) AddNoBlock(now time.Time, item T, itemSize int) (dropped *Batch[T], err error)
- func (buf *Buffer[T]) AddSyntheticNoBlock(now time.Time) (dropped *Batch[T], err error)
- func (buf *Buffer[T]) CanAddItem() bool
- func (buf *Buffer[T]) Flush(now time.Time)
- func (buf *Buffer[T]) ForceLeaseAll() []*Batch[T]
- func (buf *Buffer[T]) LeaseOne(now time.Time) (leased *Batch[T])
- func (buf *Buffer[T]) NACK(ctx context.Context, err error, leased *Batch[T])
- func (buf *Buffer[T]) NextSendTime() time.Time
- func (buf *Buffer[T]) Stats() Stats
- type DropOldestBatch
- type FullBehavior
- type InfiniteGrowth
- type Options
- type Stats
Constants ¶
This section is empty.
Variables ¶
var ( ErrBufferFull = errors.New("buffer is full") ErrItemTooLarge = errors.New("item exceeds buffer's BatchSizeMax") ErrItemTooSmall = errors.New("item has zero or negative size, and BatchSizeMax is set") )
These errors can be returned from AddNoBlock.
var Defaults = Options{ MaxLeases: 4, BatchItemsMax: 20, BatchSizeMax: -1, BatchAgeMax: 10 * time.Second, FullBehavior: &BlockNewItems{ MaxItems: 1000, }, Retry: func() retry.Iterator { return &retry.ExponentialBackoff{ Limited: retry.Limited{ Delay: 200 * time.Millisecond, Retries: -1, }, Multiplier: 1.2, MaxDelay: 60 * time.Second, } }, }
Defaults defines the defaults for Options when it contains 0-valued fields.
DO NOT ASSIGN/WRITE TO THIS STRUCT.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch[T any] struct { // Data is the work items pushed into the Buffer, plus their Size as provided // to AddNoBlock. Data []BatchItem[T] // Meta is an object which dispatcher.Channel will treat as totally opaque; // You may manipulate it in SendFn or ErrorFn as you see fit. This can be used // for e.g. associating a nonce with the Batch for retries, or stashing // a constructed RPC proto, etc. Meta any // contains filtered or unexported fields }
Batch represents a collection of individual work items and associated metadata.
Batches are are cut by the Channel according to Options.Buffer, and can be manipulated by ErrorFn and SendFn.
ErrorFn and SendFn may manipulate the contents of the Batch (Data and Meta) to do things such as:
- Associate a UID with the Batch (e.g. in the Meta field) to identify it to remote services for deduplication.
- Remove already-processed items from Data in case the SendFn partially succeeded.
The dispatcher accounts for the number of work items in the Batch as it leases the Batch out; initially the Batch's length will be len(Data). If the SendFn reduces the length of Data before the NACK, the accounted number of work items will be accordingly reduced. The accounted length can never grow (i.e. extending Data doesn't do anything).
Similarly, if the buffer is configured with BatchSize, the accounted Size of the batch is defined as the sum of the cached sizes in Data. Reducing this amount (by removing items, or potentially reducing the Size in a BatchItem) will reduce the effective Size of this Batch, but adding to Data cannot increase the Size of the batch.
type BatchItem ¶
type BatchItem[T any] struct { Item T Size int // Synthetic indicates that this BatchItem was 'synthesized' by an out-of-band // system, like dispatcher.Channel's MinQPS item generation. Synthetic bool }
BatchItem is just a container for the user-provided work items.
This includes the Size of `Data` at the time the work item was added to the buffer. This will not be modified by Buffer, but can be adjusted by your application while handling a Batch if your handler needs to trim this somehow.
type BlockNewItems ¶
type BlockNewItems struct { // The maximum number of items that this Buffer is allowed to house (including // both leased and unleased items). // // Default: -1 if BatchItemsMax != -1 else max(1000, BatchItemsMax) // Required: Must be >= BatchItemsMax or -1 (only MaxSize applies) MaxItems int // The maximum* number of 'size units' that this Buffer is allowed to house // (including both leased and unleased items). // // NOTE(*): This only blocks addition of new items once the buffer is // at-or-past capacitiy. e.g. if the buffer has a MaxSize of 1000 and // a BatchSizeMax of 100, and you insert items worth 999 units, this will // still allow the addition of another item (of up to 100 units) before // claiming the buffer is full. // // NOTE: This may only be set if BatchSizeMax is > 0; Otherwise buffer will // not enforce item sizes, and so BlockNewItems will not be able to enforce // this policy. // // Default: -1 if BatchSizeMax != -1 else BatchSizeMax * 5 // Required: Must be >= BatchSizeMax or -1 (only MaxItems applies) MaxSize int }
BlockNewItems prevents the Buffer from accepting any new items as long as it it has MaxItems worth of items.
This will never drop batches, but will block inserts.
func (*BlockNewItems) Check ¶
func (b *BlockNewItems) Check(opts Options) (err error)
Check implements FullBehavior.Check.
func (*BlockNewItems) ComputeState ¶
func (b *BlockNewItems) ComputeState(stats Stats) (okToInsert, dropBatch bool)
ComputeState implements FullBehavior.ComputeState.
func (*BlockNewItems) NeedsItemSize ¶
func (b *BlockNewItems) NeedsItemSize() bool
NeedsItemSize implements FullBehavior.NeedsItemSize.
type Buffer ¶
type Buffer[T any] struct { // contains filtered or unexported fields }
Buffer batches individual data items into Batch objects.
All access to the Buffer (as well as invoking ACK/NACK on LeasedBatches) must be synchronized because Buffer is not goroutine-safe.
func NewBuffer ¶
NewBuffer returns a new Buffer configured with the given Options.
If there's an issue with the provided Options, this returns an error.
func (*Buffer[T]) ACK ¶
ACK records that all the items in the batch have been processed.
The Batch is no longer tracked in any form by the Buffer.
Calling ACK/NACK on the same Batch twice will panic. Calling ACK/NACK on a Batch not returned from LeaseOne will panic. Calling ACK/NACK on a nil Batch will panic.
func (*Buffer[T]) AddNoBlock ¶
func (buf *Buffer[T]) AddNoBlock(now time.Time, item T, itemSize int) (dropped *Batch[T], err error)
AddNoBlock adds the item to the Buffer with the given size.
Possibly drops a batch according to FullBehavior.
Returns an error under the following conditions:
- ErrBufferFull - If FullBehavior.ComputeState returns okToInsert=false.
- ErrItemTooLarge - If this buffer has a BatchSizeMax configured and `itemSize` is larger than this.
- ErrItemTooSmall - If this buffer has a BatchSizeMax configured and `itemSize` is zero, or if `itemSize` is negative.
func (*Buffer[T]) AddSyntheticNoBlock ¶
AddSyntheticNoBlock adds a zero-Size synthetic item to the Buffer.
Otherwise behaves the same as AddNoBlock.
func (*Buffer[T]) CanAddItem ¶
CanAddItem returns true iff the Buffer will accept an item from AddNoBlock without returning ErrBufferFull.
func (*Buffer[T]) Flush ¶
Flush causes any buffered-but-not-batched data to be immediately cut into a Batch.
No-op if there's no such data.
func (*Buffer[T]) ForceLeaseAll ¶
ForceLeaseAll leases and returns all unleased Batches immediately, regardless of their send times.
This is useful for cancelation scenarios where you no longer want to do full processing on the remaining batches.
NOTE: It's helpful to call Flush before ForceLeaseAll to include the currently buffered, but unbatched, data.
func (*Buffer[T]) LeaseOne ¶
LeaseOne returns the most-available-to-send Batch from this Buffer.
The caller must invoke one of ACK/NACK on the Batch. The Batch will count against this Buffer's Stats().Total() until the caller does so.
Returns nil if no batch is available to lease, or if the Buffer has reached MaxLeases.
func (*Buffer[T]) NACK ¶
NACK analyzes the current state of Batch.Data, potentially reducing UnleasedItemCount in the Buffer's if the given Batch.Data length is smaller than when the Batch was originally leased.
The Batch will be re-enqueued unless:
- The Batch's retry Iterator returns retry.Stop
- The Batch has been dropped already due to FullBehavior policy. If this is the case, AddNoBlock would already have returned this *Batch pointer to you.
Calling ACK/NACK on the same Batch twice will panic. Calling ACK/NACK on a Batch not returned from LeaseOne will panic. Calling ACK/NACK on a nil Batch will panic.
func (*Buffer[T]) NextSendTime ¶
NextSendTime returns the send time for the next-most-available-to-send Batch, or a Zero time.Time if no batches are available to send.
NOTE: Because LeaseOne enforces MaxLeases, this time may not guarantee an available lease.
type DropOldestBatch ¶
type DropOldestBatch struct { // The maximum number of leased and unleased items that the Buffer may have // before dropping data. // // Once a batch is dropped, it no longer counts against MaxLiveItems, but it // may still be in memory if the dropped batch was currently leased. // // NOTE: The maximum Stats.Total number of items the Buffer could have at // a given time is: // // MaxLiveItems + (BatchItemsMax * MaxLeases) // // Default: -1 if BatchItemsMax == -1 else max(1000, BatchItemsMax) // Required: Must be >= BatchItemsMax or -1 (only MaxLiveSize applies) MaxLiveItems int // The maximum number of leased and unleased size units that the Buffer may // have before dropping data. // // Once a batch is dropped, it no longer counts against MaxLiveSize, but it // may still be in memory if the dropped batch was currently leased at the // time it was dropped. // // NOTE: The maximum Stats.TotalSize the Buffer could have at a given // time is: // // MaxLiveSize + (BatchSizeMax * MaxLeases) // // NOTE: This may only be set if BatchSizeMax is > 0; Otherwise buffer will // not size inserted items, and so DropOldestBatch will not be able to enforce // this policy. // // Default: -1 if BatchSizeMax == -1 else BatchSizeMax * 5 // Required: Must be >= BatchSizeMax or -1 (only MaxLiveItems applies) MaxLiveSize int }
DropOldestBatch will drop buffered data whenever the number of unleased items plus leased items would grow beyond MaxLiveItems.
This will never block inserts, but will drop batches.
func (*DropOldestBatch) Check ¶
func (d *DropOldestBatch) Check(opts Options) (err error)
Check implements FullBehavior.Check.
func (*DropOldestBatch) ComputeState ¶
func (d *DropOldestBatch) ComputeState(stats Stats) (okToInsert, dropBatch bool)
ComputeState implements FullBehavior.ComputeState.
func (*DropOldestBatch) NeedsItemSize ¶
func (d *DropOldestBatch) NeedsItemSize() bool
NeedsItemSize implements FullBehavior.NeedsItemSize.
type FullBehavior ¶
type FullBehavior interface { // Check inspects Options to see if it's compatible with this FullBehavior // implementation. // // Called exactly once during Buffer creation before any other methods are // used. Check(Options) error // ComputeState evaluates the state of the Buffer (via Stats) and returns: // // * okToInsert - User can add an item without blocking. // * dropBatch - Buffer needs to drop the oldest batch if the user does // insert data. // // Called after Check. ComputeState(stats Stats) (okToInsert, dropBatch bool) // NeedsItemSize should return true iff this FullBehavior requires item sizes // to effectively apply its policy. // // Called after Check. NeedsItemSize() bool }
FullBehavior allows you to customize the Buffer's behavior when it gets too full.
Generally you'll pick one of DropOldestBatch or BlockNewItems.
type InfiniteGrowth ¶
type InfiniteGrowth struct{}
InfiniteGrowth will not drop data or block new items. It just grows until your computer runs out of memory.
This will never block inserts, and will not drop batches.
func (InfiniteGrowth) Check ¶
func (i InfiniteGrowth) Check(opts Options) (err error)
Check implements FullBehavior.Check.
func (InfiniteGrowth) ComputeState ¶
func (i InfiniteGrowth) ComputeState(Stats) (okToInsert, dropBatch bool)
ComputeState implements FullBehavior.ComputeState.
func (InfiniteGrowth) NeedsItemSize ¶
func (i InfiniteGrowth) NeedsItemSize() bool
NeedsItemSize implements FullBehavior.NeedsItemSize.
type Options ¶
type Options struct { // [OPTIONAL] The maximum number of outstanding leases permitted. // // Attempting additional leases (with LeaseOne) while at the maximum will // return nil. // // Requirement: Must be > 0 MaxLeases int // [OPTIONAL] The maximum number of items to allow in a Batch before making it // available to lease. // // Special value -1: unlimited // Requirement: Must be == -1 (i.e. cut batches on BatchAgeMax/BatchSizeMax), // or > 0 BatchItemsMax int // [OPTIONAL] The maximum number of "size units" to allow in a Batch before // making it available to lease. // // The units used here are arbitrary and are only checked vs the value // provided to AddNoBlock. // // Size is explicitly provided to AddNoBlock by the caller. // // Inserting an item which exceeds BatchSizeMax will result in ErrItemTooLarge. // It's up to the caller to ensure that this doesn't happen. // // Special value -1: unlimited // Requirement: Must be == -1 (i.e. cut batches on BatchAgeMax/BatchItemsMax), // or > 0 BatchSizeMax int // [OPTIONAL] The maximum amount of time to wait before queuing a Batch for // transmission. Note that batches are only cut by time when a worker is ready // to process them (i.e. LeaseOne is invoked). // // Requirement: Must be > 0 BatchAgeMax time.Duration // [OPTIONAL] Sets the policy for the Buffer around how many items the Buffer // is allowed to hold, and what happens when that number is reached. FullBehavior FullBehavior // [OPTIONAL] If true, ensures that the next available batch is always the one // with the oldest data. // // If this is false (the default), batches will be leased in the order that // they're available to send; If a Batch has a retry with a high delay, it's // possible that the next leased Batch actually contains newer data than // a later batch. // // NOTE: if this is combined with high Retry values, it can lead to a // head-of-line blocking situation. // // Requirement: May only be true if MaxLeases == 1 FIFO bool // [OPTIONAL] Each batch will have a retry.Iterator assigned to it from this // retry.Factory. // // When a Batch is NACK'd, it will be retried at "now" plus the Duration // returned by the retry.Iterator. // // If the retry.Iterator returns retry.Stop, the Batch will be silently // dropped. Retry retry.Factory }
Options configures policy for the Buffer.
See Defaults for default values.
type Stats ¶
type Stats struct { // UnleasedItemCount is the total number of items (i.e. objects passed to // AddNoBlock) which are currently owned by the Buffer but are not currently // leased. This includes: // * Items buffered, but not yet cut into a Batch. // * Items in unleased Batches. // // UnleasedItemSize is the size in 'size units' of the same items. UnleasedItemCount int UnleasedItemSize int // LeasedItemCount is the total number of items (i.e. objects passed to // AddNoBlock) which are currently owned by the Buffer and are in active // leases. // // LeasedItemSize is the size in 'size units' of the same items. LeasedItemCount int LeasedItemSize int // DroppedLeasedItemCount is the total number of items (i.e. objects passed to // AddNoBlock) which were part of leases, but where those leases have been // dropped (due to FullBehavior policy), but have not yet been ACK/NACK'd. // // Once these batches are ACK/NACK'd they'll be dropped from Stats entirely. // // DroppedLeasedItemSize is the size in 'size units' of the same items. DroppedLeasedItemCount int DroppedLeasedItemSize int }
Stats is a block of information about the Buffer's present state.
func (Stats) Empty ¶
Empty returns true iff the Buffer is totally empty (has zero user-provided items).