message

package
v4.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2022 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMessagePartNotExist = errors.New("target message part does not exist")
	ErrBadMessageBytes     = errors.New("serialised message bytes were in unexpected format")
)

Errors returned by the message type.

Functions

func CopyJSON

func CopyJSON(root interface{}) (interface{}, error)

CopyJSON recursively creates a deep copy of a JSON structure extracted from a message part.

func DeserializeBytes added in v4.1.0

func DeserializeBytes(b []byte) ([][]byte, error)

DeserializeBytes rebuilds a 2D byte array from a binary serialized blob.

func GetAllBytes

func GetAllBytes(m Batch) [][]byte

GetAllBytes returns a 2D byte slice representing the raw byte content of the parts of a message.

func GetContext

func GetContext(p *Part) context.Context

GetContext either returns a context attached to the message part, or context.Background() if one hasn't been previously attached.

func NewSortGroup

func NewSortGroup(m Batch) (*SortGroup, Batch)

NewSortGroup creates a new sort group to be associated with a

func NewSortGroupParts

func NewSortGroupParts(parts []*Part) (*SortGroup, []*Part)

NewSortGroupParts creates a sort group associated with a slice of parts.

func SerializeBytes added in v4.1.0

func SerializeBytes(parts [][]byte) []byte

SerializeBytes returns a 2D byte-slice serialized.

Types

type Batch

type Batch []*Part

Batch represents zero or more messages.

func FromBytes

func FromBytes(b []byte) (Batch, error)

FromBytes deserialises a Message from a byte array.

func QuickBatch

func QuickBatch(bslice [][]byte) Batch

QuickBatch initializes a new message batch from a 2D byte slice, the slice can be nil, in which case the batch will start empty.

func (Batch) DeepCopy

func (m Batch) DeepCopy() Batch

DeepCopy creates a new deep copy of the message. This can be considered an entirely new object that is safe to use anywhere.

func (Batch) Get

func (m Batch) Get(index int) *Part

Get returns a message part at a particular index, indexes can be negative.

func (Batch) Iter

func (m Batch) Iter(f func(i int, p *Part) error) error

Iter will iterate all parts of the message, calling f for each.

func (Batch) Len

func (m Batch) Len() int

Len returns the length of the batch.

func (Batch) ShallowCopy added in v4.5.0

func (m Batch) ShallowCopy() Batch

ShallowCopy creates a new shallow copy of the message. Parts can be re-arranged in the new copy and JSON parts can be get/set without impacting other message copies.

type Part

type Part struct {
	// contains filtered or unexported fields
}

Part represents a single Benthos message.

func NewPart

func NewPart(data []byte) *Part

NewPart initializes a new message part.

func WithContext

func WithContext(ctx context.Context, p *Part) *Part

WithContext returns the same message part wrapped with a context, this context can subsequently be received with GetContext.

func (*Part) AsBytes added in v4.5.0

func (p *Part) AsBytes() []byte

AsBytes returns the body of the message part.

func (*Part) AsStructured added in v4.5.0

func (p *Part) AsStructured() (interface{}, error)

AsStructured returns the structured format of the message if already set, or attempts to parse the raw bytes as a JSON document if not. The returned structure should be considered read-only and therefore not be mutated.

func (*Part) AsStructuredMut added in v4.5.0

func (p *Part) AsStructuredMut() (interface{}, error)

AsStructuredMut returns the structured format of the message if already set, or attempts to parse the raw bytes as a JSON document if not. The returned structure is mutable and therefore safe to mutate directly.

func (*Part) DeepCopy

func (p *Part) DeepCopy() *Part

DeepCopy creates a new deep copy of the message part.

func (*Part) ErrorGet

func (p *Part) ErrorGet() error

ErrorGet returns an error associated with the message, or nil if none exists.

func (*Part) ErrorSet

func (p *Part) ErrorSet(err error)

ErrorSet modifies the error associated with a message. Errors attached to messages are used to indicate that processing has failed at some point in the processing pipeline.

func (*Part) GetContext

func (p *Part) GetContext() context.Context

GetContext returns the underlying context attached to this message part.

func (*Part) IsEmpty

func (p *Part) IsEmpty() bool

IsEmpty returns true if the message part is empty.

func (*Part) MetaDelete

func (p *Part) MetaDelete(key string)

MetaDelete removes the value of a metadata key.

func (*Part) MetaGet

func (p *Part) MetaGet(key string) string

MetaGet returns a metadata value if a key exists, otherwise an empty string.

func (*Part) MetaIter

func (p *Part) MetaIter(f func(k, v string) error) error

MetaIter iterates each metadata key/value pair.

func (*Part) MetaSet

func (p *Part) MetaSet(key, value string)

MetaSet sets the value of a metadata key.

func (*Part) SetBytes added in v4.5.0

func (p *Part) SetBytes(data []byte) *Part

SetBytes the value of the message part as a raw byte slice.

func (*Part) SetStructured added in v4.5.0

func (p *Part) SetStructured(jObj interface{})

SetStructured sets the value of the message to a structured value, this value is read-only and subsequent mutations will require cloning of the entire data structure.

func (*Part) SetStructuredMut added in v4.5.0

func (p *Part) SetStructuredMut(jObj interface{})

SetStructuredMut sets the value of the message to a structured value, this value is mutable and subsequent mutations will be performed directly on the provided data.

func (*Part) ShallowCopy added in v4.5.0

func (p *Part) ShallowCopy() *Part

ShallowCopy creates a shallow copy of the message part.

func (*Part) WithContext

func (p *Part) WithContext(ctx context.Context) *Part

WithContext returns the underlying message part with a different context attached.

type SortGroup

type SortGroup struct {
	Len int
}

SortGroup associates a tag of a part with the original group.

func (*SortGroup) GetIndex

func (g *SortGroup) GetIndex(p *Part) int

GetIndex attempts to determine the original index of a message part relative to a sort group.

type Transaction

type Transaction struct {
	// Payload is the message payload of this transaction.
	Payload Batch

	// ResponseChan should receive a response at the end of a transaction (once
	// the message is no longer owned by the receiver.) The response itself
	// indicates whether the message has been propagated successfully.
	ResponseChan chan<- error
	// contains filtered or unexported fields
}

Transaction is a component that associates a batch of one or more messages with a mechanism that is able to propagate an acknowledgement of delivery back to the source of the batch.

This allows batches to be routed through complex component networks of buffers, processing pipelines and output brokers without losing the association.

It would not be sufficient to associate acknowledgement to the message (or batch of messages) itself as it would then not be possible to expand and split message batches (grouping, etc) without loosening delivery guarantees.

The proper way to do such things would be to create a new transaction for each resulting batch, and only when all derivative transactions are acknowledged is the source transaction acknowledged in turn.

func NewTransaction

func NewTransaction(payload Batch, resChan chan<- error) Transaction

NewTransaction creates a new transaction object from a message payload and a response channel.

func NewTransactionFunc

func NewTransactionFunc(payload Batch, fn func(context.Context, error) error) Transaction

NewTransactionFunc creates a new transaction object that associates a message batch payload with a func used to acknowledge delivery of the message batch.

func (*Transaction) Ack

func (t *Transaction) Ack(ctx context.Context, err error) error

Ack returns a delivery response back through the transaction to the message source. A nil error indicates that delivery has been completed successfully, a non-nil error indicates that the message could not be delivered and should be retried or nacked upstream.

func (*Transaction) Context added in v4.2.0

func (t *Transaction) Context() context.Context

Context returns a context that indicates the cancellation of a transaction. It is optional for receivers of a transaction to honour this context, and is worth doing in cases where the transaction is blocked (on reconnect loops, etc) as it is often used as a fail-fast mechanism.

When a transaction is aborted due to cancellation it is still required that acknowledgment is made, and should be done so with t.Context().Err().

func (*Transaction) WithContext added in v4.2.0

func (t *Transaction) WithContext(ctx context.Context) *Transaction

WithContext returns a copy of the transaction associated with a context used for cancellation. When cancelled it is up to the receiver of this transaction to abort any attempt to deliver the transaction message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL