Documentation ¶
Overview ¶
Package batch contains internal utilities for interacting with message batches.
Package batch contains internal utilities for interacting with message batches.
Index ¶
- func CollapsedCount(p *message.Part) int
- func CtxCollapsedCount(ctx context.Context) int
- func CtxWithCollapsedCount(ctx context.Context, count int) context.Context
- func MessageCollapsedCount(m message.Batch) int
- func WithCollapsedCount(p *message.Part, count int) *message.Part
- type AckFunc
- type CombinedAcker
- type Error
- func (e *Error) Error() string
- func (e *Error) Failed(i int, err error) *Error
- func (e *Error) IndexedErrors() int
- func (e *Error) Unwrap() error
- func (e *Error) WalkPartsBySource(sourceSortGroup *message.SortGroup, sourceBatch message.Batch, ...)
- func (e *Error) WalkPartsNaively(fn func(int, *message.Part, error) bool)
- func (e *Error) XErroredBatch() message.Batch
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CollapsedCount ¶
CollapsedCount attempts to extract the actual number of messages that were collapsed into the resulting message part. This value could be greater than 1 when users configure processors that archive batched message parts.
func CtxCollapsedCount ¶
CtxCollapsedCount attempts to extract the actual number of messages that were collapsed into the resulting message part. This value could be greater than 1 when users configure processors that archive batched message parts.
func CtxWithCollapsedCount ¶
CtxWithCollapsedCount returns a message part with a context indicating that this message is the result of collapsing a number of messages. This allows downstream components to know how many total messages were combined.
func MessageCollapsedCount ¶
MessageCollapsedCount attempts to extract the actual number of messages that were combined into the resulting batched message parts. This value could differ from message.Len() when users configure processors that archive batched message parts.
func WithCollapsedCount ¶
WithCollapsedCount returns a message part with a context indicating that this message is the result of collapsing a number of messages. This allows downstream components to know how many total messages were combined.
Types ¶
type CombinedAcker ¶
type CombinedAcker struct {
// contains filtered or unexported fields
}
CombinedAcker creates a single ack func closure that aggregates one or more derived closures such that only once each derived closure is called the singular ack func will trigger. If at least one derived closure receives an error the singular ack func will send the first non-nil error received.
func NewCombinedAcker ¶
func NewCombinedAcker(aFn AckFunc) *CombinedAcker
NewCombinedAcker creates an aggregated that derives one or more ack funcs that, once all of which have been called, the provided root ack func is called.
func (*CombinedAcker) Derive ¶
func (c *CombinedAcker) Derive() AckFunc
Derive creates a new ack func that must be called before the origin ack func will be called. It is invalid to derive an ack func after any other previously derived funcs have been called.
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error is an error type that also allows storing granular errors for each message of a batch.
func NewError ¶
NewError creates a new batch-wide error, where it's possible to add granular errors for individual messages of the batch.
func (*Error) Failed ¶
Failed stores an error state for a particular message of a batch. Returns a pointer to the underlying error, allowing the method to be chained.
If Failed is not called then all messages are assumed to have failed. If it is called at least once then all message indexes that aren't explicitly failed are assumed to have been processed successfully.
func (*Error) IndexedErrors ¶
IndexedErrors returns the number of indexed errors that have been registered for the batch.
func (*Error) WalkPartsBySource ¶
func (e *Error) WalkPartsBySource(sourceSortGroup *message.SortGroup, sourceBatch message.Batch, fn func(int, *message.Part, error) bool)
WalkPartsBySource applies a closure to each message that was part of the request that caused this error. The closure is provided the message part index, a pointer to the part, and its individual error, which may be nil if the message itself was processed successfully. The closure returns a bool which indicates whether the iteration should be continued.
Important! The order to parts walked is not guaranteed to match that of the source batch. It is also possible for any given index to be represented zero, one or more times.
func (*Error) WalkPartsNaively ¶
WalkPartsNaively applies a closure to each message that was part of the request that caused this error. The closure is provided the message part index, a pointer to the part, and its individual error, which may be nil if the message itself was processed successfully. The closure returns a bool which indicates whether the iteration should be continued.
WARNING: The shape and order of the errored batch is not guaranteed to match that of an origin batch and therefore cannot be used to associate batch errors with the origin. Instead, use WalkPartsBySource.
func (*Error) XErroredBatch ¶
XErroredBatch returns the underlying batch associated with the error.
Directories ¶
Path | Synopsis |
---|---|
Package policy provides tooling for creating and executing Benthos message batch policies.
|
Package policy provides tooling for creating and executing Benthos message batch policies. |