message

package
v1.0.0-beta.108 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2024 License: Apache-2.0, MIT Imports: 2 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch = service.MessageBatch

type Transaction

type Transaction struct {
	// Payload is the message payload of this transaction.
	Payload Batch
	// contains filtered or unexported fields
}

Transaction is a component that associates a batch of one or more messages with a mechanism that is able to propagate an acknowledgement of delivery back to the source of the batch.

This allows batches to be routed through complex component networks of buffers, processing pipelines and output brokers without losing the association.

It would not be sufficient to associate acknowledgement to the message (or batch of messages) itself as it would then not be possible to expand and split message batches (grouping, etc) without loosening delivery guarantees.

The proper way to do such things would be to create a new transaction for each resulting batch, and only when all derivative transactions are acknowledged is the source transaction acknowledged in turn.

func NewTransaction

func NewTransaction(payload Batch, resChan chan<- error) Transaction

NewTransaction creates a new transaction object from a message payload and a response channel.

func NewTransactionFunc

func NewTransactionFunc(payload Batch, fn func(context.Context, error) error) Transaction

NewTransactionFunc creates a new transaction object that associates a message batch payload with a func used to acknowledge delivery of the message batch.

func (*Transaction) Ack

func (t *Transaction) Ack(ctx context.Context, err error) error

Ack returns a delivery response back through the transaction to the message source. A nil error indicates that delivery has been completed successfully, a non-nil error indicates that the message could not be delivered and should be retried or nacked upstream.

func (*Transaction) Context

func (t *Transaction) Context() context.Context

Context returns a context that indicates the cancellation of a transaction. It is optional for receivers of a transaction to honor this context, and is worth doing in cases where the transaction is blocked (on reconnect loops, etc) as it is often used as a fail-fast mechanism.

When a transaction is aborted due to cancellation it is still required that acknowledgment is made, and should be done so with t.Context().Err().

func (*Transaction) WithContext

func (t *Transaction) WithContext(ctx context.Context) *Transaction

WithContext returns a copy of the transaction associated with a context used for cancellation. When canceled it is up to the receiver of this transaction to abort any attempt to deliver the transaction message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL