brokerutil

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2025 License: Apache-2.0 Imports: 7 Imported by: 1

Documentation

Overview

Package brokerutil adds utility functions for the broker package.

These utility functions focus on quality-of-life improvements and introduce clean, idiomatic conversions between the interfaces defined in the broker package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AckAll

func AckAll(messages ...broker.Message)

AckAll acks all given broker.Message instances.

For performance reasons, each message is acked in its own goroutine.

func BatchedMessageIteratorIntoBatchedMessageStream

func BatchedMessageIteratorIntoBatchedMessageStream(
	iterator broker.BatchedMessageIterator,
) broker.BatchedMessageStream

BatchedMessageIteratorIntoBatchedMessageStream turns a broker.BatchedMessageIterator into broker.BatchedMessageStream. The returned error channel will contain at most one error: the first error returned by the broker.BatchedMessageIterator.

This means the returned error channel is safe to ignore until the stream is processed completely. Both channels are closed when the first error is returned or when context is done.

func BatchedMessageIteratorIntoBatchedReceiver

func BatchedMessageIteratorIntoBatchedReceiver(
	iterator broker.BatchedMessageIterator,
	settings IntoBatchedReceiverSettings,
) broker.BatchedReceiver

BatchedMessageIteratorIntoBatchedReceiver turns a broker.BatchedMessageIterator into a broker.BatchedReceiver.

func BatchedMessageIteratorIntoReceiver

func BatchedMessageIteratorIntoReceiver(
	iterator broker.BatchedMessageIterator,
	settings IntoReceiverSettings,
) broker.Receiver

BatchedMessageIteratorIntoReceiver turns a broker.BatchedMessageIterator into a broker.Receiver.

func BatchedMessageStreamIntoBatchedReceiver

func BatchedMessageStreamIntoBatchedReceiver(
	batchedMessageStream broker.BatchedMessageStream,
	settings IntoBatchedReceiverSettings,
) broker.BatchedReceiver

BatchedMessageStreamIntoBatchedReceiver turns broker.BatchedMessageStream into broker.BatchedReceiver.

func CommitHighestOffsets

func CommitHighestOffsets(records []broker.Record)

CommitHighestOffsets finds and commits only the highest offset for each partition in a batch. It is redundant to commit every message in a batch explicitly since committing only the last message achieves the same effect.

For performance reasons, the last offset for each partition is committed in its own goroutine.

func FanInReceivers

func FanInReceivers(receivers ...broker.Receiver) broker.Receiver

FanInReceivers returns a fanned-in version of the given broker.Receiver instances. The returned broker.Receiver will cancel all receives once the first one returns with an error, or the context is canceled, whichever comes first.

func GroupBatchBy

func GroupBatchBy(records []broker.Record, grouper func(record broker.Record) string) map[string][]broker.Record

func GroupBatchByKeyContents

func GroupBatchByKeyContents(records []broker.Record) map[string][]broker.Record

func MessageBatchIteratorIntoMessageStream

func MessageBatchIteratorIntoMessageStream(iterator broker.BatchedMessageIterator) broker.MessageStream

MessageBatchIteratorIntoMessageStream turns a broker.BatchedMessageIterator into broker.MessageStream. The returned error channel will contain at most one error: the first error returned by the broker.MessageIterator.

This means the returned error channel is safe to ignore until the stream is processed completely. Both channels are closed when the first error is returned or when context is done.

func MessageIteratorIntoBatchedMessageIterator

func MessageIteratorIntoBatchedMessageIterator(
	iterator broker.MessageIterator,
	settings IntoBatchedMessageIteratorSettings,
) broker.BatchedMessageIterator

MessageIteratorIntoBatchedMessageIterator turns the given broker.MessageIterator into broker.BatchedMessageIterator.

func MessageIteratorIntoBatchedMessageStream

func MessageIteratorIntoBatchedMessageStream(
	iterator broker.MessageIterator,
	settings IntoBatchedMessageStreamSettings,
) broker.BatchedMessageStream

MessageIteratorIntoBatchedMessageStream turns a broker.MessageIterator into broker.BatchedMessageStream. The returned error channel will contain at most one error: the first error returned by the broker.MessageIterator.

This means the returned error channel is safe to ignore until the stream is processed completely.

func MessageIteratorIntoBatchedReceiver

func MessageIteratorIntoBatchedReceiver(
	iterator broker.MessageIterator,
	batchSettings IntoBatchedMessageIteratorSettings,
	settings IntoBatchedReceiverSettings,
) broker.BatchedReceiver

MessageIteratorIntoBatchedReceiver turns a broker.MessageIterator into a broker.BatchedReceiver.

func MessageIteratorIntoMessageStream

func MessageIteratorIntoMessageStream(iterator broker.MessageIterator) broker.MessageStream

MessageIteratorIntoMessageStream turns a broker.MessageIterator into broker.MessageStream. The returned error channel will contain at most one error: the first error returned by the broker.MessageIterator.

This means the returned error channel is safe to ignore until the stream is processed completely. Both channels are closed when the first error is returned or when context is done.

func MessageStreamIntoBatchedMessageStream

func MessageStreamIntoBatchedMessageStream(
	messageStream broker.MessageStream,
	settings IntoBatchedMessageStreamSettings,
) broker.BatchedMessageStream

MessageStreamIntoBatchedMessageStream turns the given broker.MessageStream into broker.BatchedMessageStream, using the given IntoBatchedMessageStreamSettings.

This is a simple implementation of windowing, which waits for the current batch to reach IntoBatchedMessageStreamSettings.BatchSize or for IntoBatchedMessageStreamSettings.BatchTimeout to elapse. The results channel is closed once the underlying broker.MessageStream channel gets closed.

func NackAll

func NackAll(messages ...broker.Message)

NackAll acks all given broker.Message instances.

For performance reasons, each message is nacked in its own goroutine.

func ReceiverIntoBatchedMessageStream

func ReceiverIntoBatchedMessageStream(
	receiver broker.Receiver,
	settings IntoBatchedMessageStreamSettings,
) broker.BatchedMessageStream

ReceiverIntoBatchedMessageStream turns the given broker.Receiver into broker.BatchedMessageStream using the given IntoBatchedMessageStreamSettings.

This is achieved by first turning the receiver into a broker.MessageStream, and then windowing (batching) the stream.

func ReceiverIntoBatchedReceiver

func ReceiverIntoBatchedReceiver(
	receiver broker.Receiver,
	batchSettings IntoBatchedMessageStreamSettings,
	receiverSettings IntoBatchedReceiverSettings,
) broker.BatchedReceiver

ReceiverIntoBatchedReceiver turns broker.Receiver into broker.BatchedReceiver, by first turning broker.Receiver into broker.BatchedMessageStream.

func ReceiverIntoMessageStream

func ReceiverIntoMessageStream(receiver broker.Receiver) broker.MessageStream

ReceiverIntoMessageStream turns a broker.Receiver into broker.MessageStream. The returned error channel will contain at most one error, if the broker.Receiver instance shut down due to an error.

This means the returned error channel is safe to ignore until the stream is processed completely.

func StreamifyBatchIterator

func StreamifyBatchIterator(ctx context.Context, iterator broker.BatchedIterator) <-chan BatchIterationResult

StreamifyBatchIterator polls the broker.BatchedIterator for batches and return a channel (stream) of BatchIterationResult.

The added benefit of this is that the next poll is made as soon as the last result is put on the channel, meaning that, if record processing is slower than the poll call, the consumer of the returned channel will never have to wait for poll to fetch the next batch, potentially leading to a significant performance boost. This is also known as multiple (double) buffering.

The returned channel is closed once the context is canceled (or deadline exceeded).

func StreamifyIterator

func StreamifyIterator(ctx context.Context, iterator broker.RecordIterator) <-chan IterationResult

StreamifyIterator polls the broker.RecordIterator for new records and return a channel (stream) of IterationResult.

The added benefit of this is that the next poll is made as soon as the last result is put on the channel, meaning that, if record processing is slower than the poll call, the consumer of the returned channel will never have to wait for poll to fetch the next record, potentially leading to a significant performance boost. This is also known as multiple (double) buffering.

The returned channel is closed once the context is canceled (or deadline exceeded).

func StreamifyMessageBatchIterator

func StreamifyMessageBatchIterator(ctx context.Context, iterator broker.BatchedMessageIterator) <-chan MessageBatchIterationResult

StreamifyMessageBatchIterator polls the broker.BatchedMessageIterator for batches of messages and return a channel (stream) of MessageIterationResult.

The added benefit of this is that the next poll is made as soon as the last result is put on the channel, meaning that, if record processing is slower than the poll call, the consumer of the returned channel will never have to wait for poll to fetch the next record, potentially leading to a significant performance boost. This is also known as multiple (double) buffering.

The returned channel is closed once the context is canceled (or deadline exceeded).

func StreamifyMessageIterator

func StreamifyMessageIterator(ctx context.Context, iterator broker.MessageIterator) <-chan MessageIterationResult

StreamifyMessageIterator polls the broker.MessageIterator for new messages and return a channel (stream) of MessageIterationResult.

The added benefit of this is that the next poll is made as soon as the last result is put on the channel, meaning that, if record processing is slower than the poll call, the consumer of the returned channel will never have to wait for poll to fetch the next record, potentially leading to a significant performance boost. This is also known as multiple (double) buffering.

The returned channel is closed once the context is canceled (or deadline exceeded).

func StreamifyReceiver

func StreamifyReceiver(ctx context.Context, receiver broker.Receiver) (<-chan broker.Message, <-chan error)

StreamifyReceiver calls the receive method of the given broker.Receiver with a handler function that places each received message onto the returned channel

The returned channel is closed once the context is canceled (or deadline exceeded).

Because Receive might return an error, an error channel is also returned, which will contain at most a single error.

Types

type BatchIterationResult

type BatchIterationResult struct {
	Batch []broker.Record
	Err   error
}

type BatchedMessageIteratorFunc

type BatchedMessageIteratorFunc func(context.Context) ([]broker.Message, error)

BatchedMessageIteratorFunc a utility type to allow functions to implement broker.BatchedMessageIterator.

func (BatchedMessageIteratorFunc) NextBatch

type BatchedMessageStreamFunc

type BatchedMessageStreamFunc func(ctx context.Context) (<-chan []broker.Message, <-chan error)

BatchedMessageStreamFunc a utility type to allow functions to implement broker.BatchedMessageStream.

func (BatchedMessageStreamFunc) BatchStream

func (f BatchedMessageStreamFunc) BatchStream(ctx context.Context) (<-chan []broker.Message, <-chan error)

type BatchedReceiverFunc

type BatchedReceiverFunc func(context.Context, func(context.Context, []broker.Message)) error

BatchedReceiverFunc a utility type to allow functions to implement broker.BatchedReceiver.

func (BatchedReceiverFunc) ReceiveBatch

func (f BatchedReceiverFunc) ReceiveBatch(ctx context.Context, h func(context.Context, []broker.Message)) error

type IntoBatchedMessageIteratorSettings

type IntoBatchedMessageIteratorSettings struct {
	// BatchSize the upper bound of the size of the batch.
	BatchSize int

	// Timeout the maximum amount of time the stream will wait for BatchSize number of messages.
	Timeout time.Duration
}

IntoBatchedMessageIteratorSettings holds the settings to be used in MessageIteratorIntoBatchedMessageIterator.

type IntoBatchedMessageStreamSettings

type IntoBatchedMessageStreamSettings struct {
	BatchSize    int
	BatchTimeout time.Duration
}

IntoBatchedMessageStreamSettings holds the settings to be used in MessageIteratorIntoBatchedMessageStream.

type IntoBatchedReceiverSettings

type IntoBatchedReceiverSettings struct {
	// NumGoroutines the number of concurrent workers that execute the given handle function.
	NumGoroutines int
}

IntoBatchedReceiverSettings holds the settings to be used in BatchedMessageIteratorIntoReceiver.

type IntoReceiverSettings

type IntoReceiverSettings struct {
	// NumGoroutines the number of concurrent workers that execute the given handle function.
	NumGoroutines int
}

IntoReceiverSettings holds the settings to be used in BatchedMessageIteratorIntoReceiver.

type IterationResult

type IterationResult struct {
	Record broker.Record
	Err    error
}

type IteratorIntoReceiver

type IteratorIntoReceiver struct {
	Iterator broker.BatchedMessageIterator
	Settings IntoReceiverSettings
}

func (*IteratorIntoReceiver) Receive

func (r *IteratorIntoReceiver) Receive(ctx context.Context, function func(ctx context.Context, message broker.Message)) error

type MessageBatchIterationResult

type MessageBatchIterationResult struct {
	Messages []broker.Message
	Err      error
}

type MessageIterationResult

type MessageIterationResult struct {
	Message broker.Message
	Err     error
}

type MessageStreamFunc

type MessageStreamFunc func(ctx context.Context) (<-chan broker.Message, <-chan error)

MessageStreamFunc a utility type to allow functions to implement broker.MessageStream.

func (MessageStreamFunc) Stream

func (f MessageStreamFunc) Stream(ctx context.Context) (<-chan broker.Message, <-chan error)

type ReceiverFunc

type ReceiverFunc func(context.Context, func(context.Context, broker.Message)) error

ReceiverFunc a utility type to allow functions to implement broker.Receiver.

func (ReceiverFunc) Receive

func (f ReceiverFunc) Receive(ctx context.Context, h func(context.Context, broker.Message)) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL