message

package
v4.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2022 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMessagePartNotExist = errors.New("target message part does not exist")
	ErrBadMessageBytes     = errors.New("serialised message bytes were in unexpected format")
	ErrBlockCorrupted      = errors.New("serialised messages block was in unexpected format")
)

Errors returned by the message type.

Functions

func CopyJSON

func CopyJSON(root interface{}) (interface{}, error)

CopyJSON recursively creates a deep copy of a JSON structure extracted from a message part.

func GetAllBytes

func GetAllBytes(m *Batch) [][]byte

GetAllBytes returns a 2D byte slice representing the raw byte content of the parts of a message.

func GetAllBytesLen

func GetAllBytesLen(m *Batch) int

GetAllBytesLen returns total length of message content in bytes

func GetContext

func GetContext(p *Part) context.Context

GetContext either returns a context attached to the message part, or context.Background() if one hasn't been previously attached.

func NewSortGroup

func NewSortGroup(m *Batch) (*SortGroup, *Batch)

NewSortGroup creates a new sort group to be associated with a

func NewSortGroupParts

func NewSortGroupParts(parts []*Part) (*SortGroup, []*Part)

NewSortGroupParts creates a sort group associated with a slice of parts.

func SetAllMetadata

func SetAllMetadata(m *Batch, meta map[string]string)

SetAllMetadata sets the metadata of all message parts to match a provided metadata implementation.

func ToBytes

func ToBytes(m *Batch) []byte

ToBytes serialises a message into a single byte array.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch represents zero or more messages.

func FromBytes

func FromBytes(b []byte) (*Batch, error)

FromBytes deserialises a Message from a byte array.

func QuickBatch

func QuickBatch(bslice [][]byte) *Batch

QuickBatch initializes a new message batch from a 2D byte slice, the slice can be nil, in which case the batch will start empty.

func (*Batch) Append

func (m *Batch) Append(b ...*Part) int

Append adds a new message part to the message.

func (*Batch) Copy

func (m *Batch) Copy() *Batch

Copy creates a new shallow copy of the message. Parts can be re-arranged in the new copy and JSON parts can be get/set without impacting other message copies. However, it is still unsafe to edit the raw content of message parts.

func (*Batch) DeepCopy

func (m *Batch) DeepCopy() *Batch

DeepCopy creates a new deep copy of the message. This can be considered an entirely new object that is safe to use anywhere.

func (*Batch) Get

func (m *Batch) Get(index int) *Part

Get returns a message part at a particular index, indexes can be negative.

func (*Batch) Iter

func (m *Batch) Iter(f func(i int, p *Part) error) error

Iter will iterate all parts of the message, calling f for each.

func (*Batch) Len

func (m *Batch) Len() int

Len returns the length of the message in parts.

func (*Batch) SetAll

func (m *Batch) SetAll(parts []*Part)

SetAll changes the entire set of message parts.

type Part

type Part struct {
	// contains filtered or unexported fields
}

Part represents a single Benthos message.

func NewPart

func NewPart(data []byte) *Part

NewPart initializes a new message part.

func WithContext

func WithContext(ctx context.Context, p *Part) *Part

WithContext returns the same message part wrapped with a context, this context can subsequently be received with GetContext.

func (*Part) Copy

func (p *Part) Copy() *Part

Copy creates a shallow copy of the message part.

func (*Part) DeepCopy

func (p *Part) DeepCopy() *Part

DeepCopy creates a new deep copy of the message part.

func (*Part) ErrorGet

func (p *Part) ErrorGet() error

ErrorGet returns an error associated with the message, or nil if none exists.

func (*Part) ErrorSet

func (p *Part) ErrorSet(err error)

ErrorSet modifies the error associated with a message. Errors attached to messages are used to indicate that processing has failed at some point in the processing pipeline.

func (*Part) Get

func (p *Part) Get() []byte

Get returns the body of the message part.

func (*Part) GetContext

func (p *Part) GetContext() context.Context

GetContext returns the underlying context attached to this message part.

func (*Part) IsEmpty

func (p *Part) IsEmpty() bool

IsEmpty returns true if the message part is empty.

func (*Part) JSON

func (p *Part) JSON() (interface{}, error)

JSON attempts to parse the message part as a JSON document and returns the result.

func (*Part) MetaDelete

func (p *Part) MetaDelete(key string)

MetaDelete removes the value of a metadata key.

func (*Part) MetaGet

func (p *Part) MetaGet(key string) string

MetaGet returns a metadata value if a key exists, otherwise an empty string.

func (*Part) MetaIter

func (p *Part) MetaIter(f func(k, v string) error) error

MetaIter iterates each metadata key/value pair.

func (*Part) MetaSet

func (p *Part) MetaSet(key, value string)

MetaSet sets the value of a metadata key.

func (*Part) Set

func (p *Part) Set(data []byte) *Part

Set the value of the message part.

func (*Part) SetJSON

func (p *Part) SetJSON(jObj interface{})

SetJSON attempts to marshal a JSON document into a byte slice and stores the result as the contents of the message part.

func (*Part) WithContext

func (p *Part) WithContext(ctx context.Context) *Part

WithContext returns the underlying message part with a different context attached.

type SortGroup

type SortGroup struct {
	Len int
}

SortGroup associates a tag of a part with the original group.

func (*SortGroup) GetIndex

func (g *SortGroup) GetIndex(p *Part) int

GetIndex attempts to determine the original index of a message part relative to a sort group.

type Transaction

type Transaction struct {
	// Payload is the message payload of this transaction.
	Payload *Batch

	// ResponseChan should receive a response at the end of a transaction (once
	// the message is no longer owned by the receiver.) The response itself
	// indicates whether the message has been propagated successfully.
	ResponseChan chan<- error
	// contains filtered or unexported fields
}

Transaction is a component that associates a batch of one or more messages with a mechanism that is able to propagate an acknowledgement of delivery back to the source of the batch.

This allows batches to be routed through complex component networks of buffers, processing pipelines and output brokers without losing the association.

It would not be sufficient to associate acknowledgement to the message (or batch of messages) itself as it would then not be possible to expand and split message batches (grouping, etc) without loosening delivery guarantees.

The proper way to do such things would be to create a new transaction for each resulting batch, and only when all derivative transactions are acknowledged is the source transaction acknowledged in turn.

func NewTransaction

func NewTransaction(payload *Batch, resChan chan<- error) Transaction

NewTransaction creates a new transaction object from a message payload and a response channel.

func NewTransactionFunc

func NewTransactionFunc(payload *Batch, fn func(context.Context, error) error) Transaction

NewTransactionFunc creates a new transaction object that associates a message batch payload with a func used to acknowledge delivery of the message batch.

func (*Transaction) Ack

func (t *Transaction) Ack(ctx context.Context, err error) error

Ack returns a delivery response back through the transaction to the message source. A nil error indicates that delivery has been completed successfully, a non-nil error indicates that the message could not be delivered and should be retried or nacked upstream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL