Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch = service.MessageBatch
type Transaction ¶
type Transaction struct { // Payload is the message payload of this transaction. Payload Batch // contains filtered or unexported fields }
Transaction is a component that associates a batch of one or more messages with a mechanism that is able to propagate an acknowledgement of delivery back to the source of the batch.
This allows batches to be routed through complex component networks of buffers, processing pipelines and output brokers without losing the association.
It would not be sufficient to associate acknowledgement to the message (or batch of messages) itself as it would then not be possible to expand and split message batches (grouping, etc) without loosening delivery guarantees.
The proper way to do such things would be to create a new transaction for each resulting batch, and only when all derivative transactions are acknowledged is the source transaction acknowledged in turn.
func NewTransaction ¶
func NewTransaction(payload Batch, resChan chan<- error) Transaction
NewTransaction creates a new transaction object from a message payload and a response channel.
func NewTransactionFunc ¶
NewTransactionFunc creates a new transaction object that associates a message batch payload with a func used to acknowledge delivery of the message batch.
func (*Transaction) Ack ¶
func (t *Transaction) Ack(ctx context.Context, err error) error
Ack returns a delivery response back through the transaction to the message source. A nil error indicates that delivery has been completed successfully, a non-nil error indicates that the message could not be delivered and should be retried or nacked upstream.
func (*Transaction) Context ¶
func (t *Transaction) Context() context.Context
Context returns a context that indicates the cancellation of a transaction. It is optional for receivers of a transaction to honor this context, and is worth doing in cases where the transaction is blocked (on reconnect loops, etc) as it is often used as a fail-fast mechanism.
When a transaction is aborted due to cancellation it is still required that acknowledgment is made, and should be done so with t.Context().Err().
func (*Transaction) WithContext ¶
func (t *Transaction) WithContext(ctx context.Context) *Transaction
WithContext returns a copy of the transaction associated with a context used for cancellation. When canceled it is up to the receiver of this transaction to abort any attempt to deliver the transaction message.