Documentation ¶
Overview ¶
Package brokerutil adds utility functions for the broker package.
These utility functions focus on quality-of-life improvements and introduce clean, idiomatic conversions between the interfaces defined in the broker package.
Index ¶
- func AckAll(messages ...broker.Message)
- func BatchedMessageIteratorIntoBatchedMessageStream(iterator broker.BatchedMessageIterator) broker.BatchedMessageStream
- func BatchedMessageIteratorIntoBatchedReceiver(iterator broker.BatchedMessageIterator, settings IntoBatchedReceiverSettings) broker.BatchedReceiver
- func BatchedMessageIteratorIntoReceiver(iterator broker.BatchedMessageIterator, settings IntoReceiverSettings) broker.Receiver
- func BatchedMessageStreamIntoBatchedReceiver(batchedMessageStream broker.BatchedMessageStream, ...) broker.BatchedReceiver
- func CommitHighestOffsets(records []broker.Record)
- func FanInReceivers(receivers ...broker.Receiver) broker.Receiver
- func GroupBatchBy(records []broker.Record, grouper func(record broker.Record) string) map[string][]broker.Record
- func GroupBatchByKeyContents(records []broker.Record) map[string][]broker.Record
- func MessageBatchIteratorIntoMessageStream(iterator broker.BatchedMessageIterator) broker.MessageStream
- func MessageIteratorIntoBatchedMessageIterator(iterator broker.MessageIterator, settings IntoBatchedMessageIteratorSettings) broker.BatchedMessageIterator
- func MessageIteratorIntoBatchedMessageStream(iterator broker.MessageIterator, settings IntoBatchedMessageStreamSettings) broker.BatchedMessageStream
- func MessageIteratorIntoBatchedReceiver(iterator broker.MessageIterator, ...) broker.BatchedReceiver
- func MessageIteratorIntoMessageStream(iterator broker.MessageIterator) broker.MessageStream
- func MessageStreamIntoBatchedMessageStream(messageStream broker.MessageStream, settings IntoBatchedMessageStreamSettings) broker.BatchedMessageStream
- func NackAll(messages ...broker.Message)
- func ReceiverIntoBatchedMessageStream(receiver broker.Receiver, settings IntoBatchedMessageStreamSettings) broker.BatchedMessageStream
- func ReceiverIntoBatchedReceiver(receiver broker.Receiver, batchSettings IntoBatchedMessageStreamSettings, ...) broker.BatchedReceiver
- func ReceiverIntoMessageStream(receiver broker.Receiver) broker.MessageStream
- func StreamifyBatchIterator(ctx context.Context, iterator broker.BatchedIterator) <-chan BatchIterationResult
- func StreamifyIterator(ctx context.Context, iterator broker.RecordIterator) <-chan IterationResult
- func StreamifyMessageBatchIterator(ctx context.Context, iterator broker.BatchedMessageIterator) <-chan MessageBatchIterationResult
- func StreamifyMessageIterator(ctx context.Context, iterator broker.MessageIterator) <-chan MessageIterationResult
- func StreamifyReceiver(ctx context.Context, receiver broker.Receiver) (<-chan broker.Message, <-chan error)
- type BatchIterationResult
- type BatchedMessageIteratorFunc
- type BatchedMessageStreamFunc
- type BatchedReceiverFunc
- type IntoBatchedMessageIteratorSettings
- type IntoBatchedMessageStreamSettings
- type IntoBatchedReceiverSettings
- type IntoReceiverSettings
- type IterationResult
- type IteratorIntoReceiver
- type MessageBatchIterationResult
- type MessageIterationResult
- type MessageStreamFunc
- type ReceiverFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AckAll ¶
AckAll acks all given broker.Message instances.
For performance reasons, each message is acked in its own goroutine.
func BatchedMessageIteratorIntoBatchedMessageStream ¶
func BatchedMessageIteratorIntoBatchedMessageStream( iterator broker.BatchedMessageIterator, ) broker.BatchedMessageStream
BatchedMessageIteratorIntoBatchedMessageStream turns a broker.BatchedMessageIterator into broker.BatchedMessageStream. The returned error channel will contain at most one error: the first error returned by the broker.BatchedMessageIterator.
This means the returned error channel is safe to ignore until the stream is processed completely. Both channels are closed when the first error is returned or when context is done.
func BatchedMessageIteratorIntoBatchedReceiver ¶
func BatchedMessageIteratorIntoBatchedReceiver( iterator broker.BatchedMessageIterator, settings IntoBatchedReceiverSettings, ) broker.BatchedReceiver
BatchedMessageIteratorIntoBatchedReceiver turns a broker.BatchedMessageIterator into a broker.BatchedReceiver.
func BatchedMessageIteratorIntoReceiver ¶
func BatchedMessageIteratorIntoReceiver( iterator broker.BatchedMessageIterator, settings IntoReceiverSettings, ) broker.Receiver
BatchedMessageIteratorIntoReceiver turns a broker.BatchedMessageIterator into a broker.Receiver.
func BatchedMessageStreamIntoBatchedReceiver ¶
func BatchedMessageStreamIntoBatchedReceiver( batchedMessageStream broker.BatchedMessageStream, settings IntoBatchedReceiverSettings, ) broker.BatchedReceiver
BatchedMessageStreamIntoBatchedReceiver turns broker.BatchedMessageStream into broker.BatchedReceiver.
func CommitHighestOffsets ¶
CommitHighestOffsets finds and commits only the highest offset for each partition in a batch. It is redundant to commit every message in a batch explicitly since committing only the last message achieves the same effect.
For performance reasons, the last offset for each partition is committed in its own goroutine.
func FanInReceivers ¶
FanInReceivers returns a fanned-in version of the given broker.Receiver instances. The returned broker.Receiver will cancel all receives once the first one returns with an error, or the context is canceled, whichever comes first.
func GroupBatchBy ¶
func GroupBatchByKeyContents ¶
func MessageBatchIteratorIntoMessageStream ¶
func MessageBatchIteratorIntoMessageStream(iterator broker.BatchedMessageIterator) broker.MessageStream
MessageBatchIteratorIntoMessageStream turns a broker.BatchedMessageIterator into broker.MessageStream. The returned error channel will contain at most one error: the first error returned by the broker.MessageIterator.
This means the returned error channel is safe to ignore until the stream is processed completely. Both channels are closed when the first error is returned or when context is done.
func MessageIteratorIntoBatchedMessageIterator ¶
func MessageIteratorIntoBatchedMessageIterator( iterator broker.MessageIterator, settings IntoBatchedMessageIteratorSettings, ) broker.BatchedMessageIterator
MessageIteratorIntoBatchedMessageIterator turns the given broker.MessageIterator into broker.BatchedMessageIterator.
func MessageIteratorIntoBatchedMessageStream ¶
func MessageIteratorIntoBatchedMessageStream( iterator broker.MessageIterator, settings IntoBatchedMessageStreamSettings, ) broker.BatchedMessageStream
MessageIteratorIntoBatchedMessageStream turns a broker.MessageIterator into broker.BatchedMessageStream. The returned error channel will contain at most one error: the first error returned by the broker.MessageIterator.
This means the returned error channel is safe to ignore until the stream is processed completely.
func MessageIteratorIntoBatchedReceiver ¶
func MessageIteratorIntoBatchedReceiver( iterator broker.MessageIterator, batchSettings IntoBatchedMessageIteratorSettings, settings IntoBatchedReceiverSettings, ) broker.BatchedReceiver
MessageIteratorIntoBatchedReceiver turns a broker.MessageIterator into a broker.BatchedReceiver.
func MessageIteratorIntoMessageStream ¶
func MessageIteratorIntoMessageStream(iterator broker.MessageIterator) broker.MessageStream
MessageIteratorIntoMessageStream turns a broker.MessageIterator into broker.MessageStream. The returned error channel will contain at most one error: the first error returned by the broker.MessageIterator.
This means the returned error channel is safe to ignore until the stream is processed completely. Both channels are closed when the first error is returned or when context is done.
func MessageStreamIntoBatchedMessageStream ¶
func MessageStreamIntoBatchedMessageStream( messageStream broker.MessageStream, settings IntoBatchedMessageStreamSettings, ) broker.BatchedMessageStream
MessageStreamIntoBatchedMessageStream turns the given broker.MessageStream into broker.BatchedMessageStream, using the given IntoBatchedMessageStreamSettings.
This is a simple implementation of windowing, which waits for the current batch to reach IntoBatchedMessageStreamSettings.BatchSize or for IntoBatchedMessageStreamSettings.BatchTimeout to elapse. The results channel is closed once the underlying broker.MessageStream channel gets closed.
func NackAll ¶
NackAll acks all given broker.Message instances.
For performance reasons, each message is nacked in its own goroutine.
func ReceiverIntoBatchedMessageStream ¶
func ReceiverIntoBatchedMessageStream( receiver broker.Receiver, settings IntoBatchedMessageStreamSettings, ) broker.BatchedMessageStream
ReceiverIntoBatchedMessageStream turns the given broker.Receiver into broker.BatchedMessageStream using the given IntoBatchedMessageStreamSettings.
This is achieved by first turning the receiver into a broker.MessageStream, and then windowing (batching) the stream.
func ReceiverIntoBatchedReceiver ¶
func ReceiverIntoBatchedReceiver( receiver broker.Receiver, batchSettings IntoBatchedMessageStreamSettings, receiverSettings IntoBatchedReceiverSettings, ) broker.BatchedReceiver
ReceiverIntoBatchedReceiver turns broker.Receiver into broker.BatchedReceiver, by first turning broker.Receiver into broker.BatchedMessageStream.
func ReceiverIntoMessageStream ¶
func ReceiverIntoMessageStream(receiver broker.Receiver) broker.MessageStream
ReceiverIntoMessageStream turns a broker.Receiver into broker.MessageStream. The returned error channel will contain at most one error, if the broker.Receiver instance shut down due to an error.
This means the returned error channel is safe to ignore until the stream is processed completely.
func StreamifyBatchIterator ¶
func StreamifyBatchIterator(ctx context.Context, iterator broker.BatchedIterator) <-chan BatchIterationResult
StreamifyBatchIterator polls the broker.BatchedIterator for batches and return a channel (stream) of BatchIterationResult.
The added benefit of this is that the next poll is made as soon as the last result is put on the channel, meaning that, if record processing is slower than the poll call, the consumer of the returned channel will never have to wait for poll to fetch the next batch, potentially leading to a significant performance boost. This is also known as multiple (double) buffering.
The returned channel is closed once the context is canceled (or deadline exceeded).
func StreamifyIterator ¶
func StreamifyIterator(ctx context.Context, iterator broker.RecordIterator) <-chan IterationResult
StreamifyIterator polls the broker.RecordIterator for new records and return a channel (stream) of IterationResult.
The added benefit of this is that the next poll is made as soon as the last result is put on the channel, meaning that, if record processing is slower than the poll call, the consumer of the returned channel will never have to wait for poll to fetch the next record, potentially leading to a significant performance boost. This is also known as multiple (double) buffering.
The returned channel is closed once the context is canceled (or deadline exceeded).
func StreamifyMessageBatchIterator ¶
func StreamifyMessageBatchIterator(ctx context.Context, iterator broker.BatchedMessageIterator) <-chan MessageBatchIterationResult
StreamifyMessageBatchIterator polls the broker.BatchedMessageIterator for batches of messages and return a channel (stream) of MessageIterationResult.
The added benefit of this is that the next poll is made as soon as the last result is put on the channel, meaning that, if record processing is slower than the poll call, the consumer of the returned channel will never have to wait for poll to fetch the next record, potentially leading to a significant performance boost. This is also known as multiple (double) buffering.
The returned channel is closed once the context is canceled (or deadline exceeded).
func StreamifyMessageIterator ¶
func StreamifyMessageIterator(ctx context.Context, iterator broker.MessageIterator) <-chan MessageIterationResult
StreamifyMessageIterator polls the broker.MessageIterator for new messages and return a channel (stream) of MessageIterationResult.
The added benefit of this is that the next poll is made as soon as the last result is put on the channel, meaning that, if record processing is slower than the poll call, the consumer of the returned channel will never have to wait for poll to fetch the next record, potentially leading to a significant performance boost. This is also known as multiple (double) buffering.
The returned channel is closed once the context is canceled (or deadline exceeded).
func StreamifyReceiver ¶
func StreamifyReceiver(ctx context.Context, receiver broker.Receiver) (<-chan broker.Message, <-chan error)
StreamifyReceiver calls the receive method of the given broker.Receiver with a handler function that places each received message onto the returned channel
The returned channel is closed once the context is canceled (or deadline exceeded).
Because Receive might return an error, an error channel is also returned, which will contain at most a single error.
Types ¶
type BatchIterationResult ¶
type BatchedMessageIteratorFunc ¶
BatchedMessageIteratorFunc a utility type to allow functions to implement broker.BatchedMessageIterator.
type BatchedMessageStreamFunc ¶
BatchedMessageStreamFunc a utility type to allow functions to implement broker.BatchedMessageStream.
func (BatchedMessageStreamFunc) BatchStream ¶
type BatchedReceiverFunc ¶
BatchedReceiverFunc a utility type to allow functions to implement broker.BatchedReceiver.
func (BatchedReceiverFunc) ReceiveBatch ¶
type IntoBatchedMessageIteratorSettings ¶
type IntoBatchedMessageIteratorSettings struct { // BatchSize the upper bound of the size of the batch. BatchSize int // Timeout the maximum amount of time the stream will wait for BatchSize number of messages. Timeout time.Duration }
IntoBatchedMessageIteratorSettings holds the settings to be used in MessageIteratorIntoBatchedMessageIterator.
type IntoBatchedMessageStreamSettings ¶
IntoBatchedMessageStreamSettings holds the settings to be used in MessageIteratorIntoBatchedMessageStream.
type IntoBatchedReceiverSettings ¶
type IntoBatchedReceiverSettings struct { // NumGoroutines the number of concurrent workers that execute the given handle function. NumGoroutines int }
IntoBatchedReceiverSettings holds the settings to be used in BatchedMessageIteratorIntoReceiver.
type IntoReceiverSettings ¶
type IntoReceiverSettings struct { // NumGoroutines the number of concurrent workers that execute the given handle function. NumGoroutines int }
IntoReceiverSettings holds the settings to be used in BatchedMessageIteratorIntoReceiver.
type IterationResult ¶
type IteratorIntoReceiver ¶
type IteratorIntoReceiver struct { Iterator broker.BatchedMessageIterator Settings IntoReceiverSettings }
type MessageIterationResult ¶
type MessageStreamFunc ¶
MessageStreamFunc a utility type to allow functions to implement broker.MessageStream.
type ReceiverFunc ¶
ReceiverFunc a utility type to allow functions to implement broker.Receiver.