message

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2023 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMessagePartNotExist = errors.New("target message part does not exist")
	ErrBadMessageBytes     = errors.New("serialised message bytes were in unexpected format")
)

Errors returned by the message type.

Functions

func CopyJSON

func CopyJSON(root any) any

CopyJSON recursively creates a deep copy of a JSON structure extracted from a message part.

func DeserializeBytes

func DeserializeBytes(b []byte) ([][]byte, error)

DeserializeBytes rebuilds a 2D byte array from a binary serialized blob.

func GetAllBytes

func GetAllBytes(m Batch) [][]byte

GetAllBytes returns a 2D byte slice representing the raw byte content of the parts of a message.

func GetContext

func GetContext(p *Part) context.Context

GetContext either returns a context attached to the message part, or context.Background() if one hasn't been previously attached.

func NewSortGroup

func NewSortGroup(parts Batch) (*SortGroup, Batch)

NewSortGroup creates a sort group associated with a slice of parts.

func SerializeBytes

func SerializeBytes(parts [][]byte) []byte

SerializeBytes returns a 2D byte-slice serialized.

Types

type Batch

type Batch []*Part

Batch represents zero or more messages.

func FromBytes

func FromBytes(b []byte) (Batch, error)

FromBytes deserialises a Message from a byte array.

func QuickBatch

func QuickBatch(bslice [][]byte) Batch

QuickBatch initializes a new message batch from a 2D byte slice, the slice can be nil, in which case the batch will start empty.

func (Batch) DeepCopy

func (m Batch) DeepCopy() Batch

DeepCopy creates a new deep copy of the message. This can be considered an entirely new object that is safe to use anywhere.

func (Batch) Get

func (m Batch) Get(index int) *Part

Get returns a message part at a particular index, indexes can be negative.

func (Batch) Iter

func (m Batch) Iter(f func(i int, p *Part) error) error

Iter will iterate all parts of the message, calling f for each.

func (Batch) Len

func (m Batch) Len() int

Len returns the length of the batch.

func (Batch) ShallowCopy

func (m Batch) ShallowCopy() Batch

ShallowCopy creates a new shallow copy of the message. Parts can be re-arranged in the new copy and JSON parts can be get/set without impacting other message copies.

type Part

type Part struct {
	// contains filtered or unexported fields
}

Part represents a single Benthos message.

func NewPart

func NewPart(data []byte) *Part

NewPart initializes a new message part.

func WithContext

func WithContext(ctx context.Context, p *Part) *Part

WithContext returns the same message part wrapped with a context, this context can subsequently be received with GetContext.

func (*Part) AsBytes

func (p *Part) AsBytes() []byte

AsBytes returns the body of the message part.

func (*Part) AsStructured

func (p *Part) AsStructured() (any, error)

AsStructured returns the structured format of the message if already set, or attempts to parse the raw bytes as a JSON document if not. The returned structure should be considered read-only and therefore not be mutated.

func (*Part) AsStructuredMut

func (p *Part) AsStructuredMut() (any, error)

AsStructuredMut returns the structured format of the message if already set, or attempts to parse the raw bytes as a JSON document if not. The returned structure is mutable and therefore safe to mutate directly.

func (*Part) DeepCopy

func (p *Part) DeepCopy() *Part

DeepCopy creates a new deep copy of the message part.

func (*Part) ErrorGet

func (p *Part) ErrorGet() error

ErrorGet returns an error associated with the message, or nil if none exists.

func (*Part) ErrorSet

func (p *Part) ErrorSet(err error)

ErrorSet modifies the error associated with a message. Errors attached to messages are used to indicate that processing has failed at some point in the processing pipeline.

func (*Part) GetContext

func (p *Part) GetContext() context.Context

GetContext returns the underlying context attached to this message part.

func (*Part) IsEmpty

func (p *Part) IsEmpty() bool

IsEmpty returns true if the message part is empty.

func (*Part) MetaDelete

func (p *Part) MetaDelete(key string)

MetaDelete removes the value of a metadata key.

func (*Part) MetaGetMut

func (p *Part) MetaGetMut(key string) (any, bool)

MetaGetMut returns a metadata value if a key exists.

func (*Part) MetaGetStr

func (p *Part) MetaGetStr(key string) string

MetaGetStr returns a metadata value if a key exists as a string, otherwise an empty string.

func (*Part) MetaIterMut

func (p *Part) MetaIterMut(f func(string, any) error) error

MetaIterMut iterates each metadata key/value pair.

func (*Part) MetaIterStr

func (p *Part) MetaIterStr(f func(string, string) error) error

MetaIterStr iterates each metadata key/value pair with the value serialised as a string.

func (*Part) MetaSetMut

func (p *Part) MetaSetMut(key string, value any)

MetaSetMut sets the value of a metadata key to any value.

func (*Part) SetBytes

func (p *Part) SetBytes(data []byte) *Part

SetBytes the value of the message part as a raw byte slice.

func (*Part) SetStructured

func (p *Part) SetStructured(jObj any)

SetStructured sets the value of the message to a structured value, this value is read-only and subsequent mutations will require cloning of the entire data structure.

func (*Part) SetStructuredMut

func (p *Part) SetStructuredMut(jObj any)

SetStructuredMut sets the value of the message to a structured value, this value is mutable and subsequent mutations will be performed directly on the provided data.

func (*Part) ShallowCopy

func (p *Part) ShallowCopy() *Part

ShallowCopy creates a shallow copy of the message part.

func (*Part) WithContext

func (p *Part) WithContext(ctx context.Context) *Part

WithContext returns the underlying message part with a different context attached.

type SortGroup

type SortGroup struct {
	Len int
}

SortGroup associates a tag of a part with the original group.

func TopLevelSortGroup added in v1.1.0

func TopLevelSortGroup(p *Part) *SortGroup

TopLevelSortGroup returns the newest sort group to be associated with the given message part, or nil if there is none.

func (*SortGroup) GetIndex

func (g *SortGroup) GetIndex(p *Part) int

GetIndex attempts to determine the original index of a message part relative to a sort group.

type Transaction

type Transaction struct {
	// Payload is the message payload of this transaction.
	Payload Batch
	// contains filtered or unexported fields
}

Transaction is a component that associates a batch of one or more messages with a mechanism that is able to propagate an acknowledgement of delivery back to the source of the batch.

This allows batches to be routed through complex component networks of buffers, processing pipelines and output brokers without losing the association.

It would not be sufficient to associate acknowledgement to the message (or batch of messages) itself as it would then not be possible to expand and split message batches (grouping, etc) without loosening delivery guarantees.

The proper way to do such things would be to create a new transaction for each resulting batch, and only when all derivative transactions are acknowledged is the source transaction acknowledged in turn.

func NewTransaction

func NewTransaction(payload Batch, resChan chan<- error) Transaction

NewTransaction creates a new transaction object from a message payload and a response channel.

func NewTransactionFunc

func NewTransactionFunc(payload Batch, fn func(context.Context, error) error) Transaction

NewTransactionFunc creates a new transaction object that associates a message batch payload with a func used to acknowledge delivery of the message batch.

func (*Transaction) Ack

func (t *Transaction) Ack(ctx context.Context, err error) error

Ack returns a delivery response back through the transaction to the message source. A nil error indicates that delivery has been completed successfully, a non-nil error indicates that the message could not be delivered and should be retried or nacked upstream.

func (*Transaction) Context

func (t *Transaction) Context() context.Context

Context returns a context that indicates the cancellation of a transaction. It is optional for receivers of a transaction to honour this context, and is worth doing in cases where the transaction is blocked (on reconnect loops, etc) as it is often used as a fail-fast mechanism.

When a transaction is aborted due to cancellation it is still required that acknowledgment is made, and should be done so with t.Context().Err().

func (*Transaction) WithContext

func (t *Transaction) WithContext(ctx context.Context) *Transaction

WithContext returns a copy of the transaction associated with a context used for cancellation. When cancelled it is up to the receiver of this transaction to abort any attempt to deliver the transaction message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL