Documentation ¶
Index ¶
- Variables
- func CopyJSON(root any) any
- func DeserializeBytes(b []byte) ([][]byte, error)
- func GetAllBytes(m Batch) [][]byte
- func GetContext(p *Part) context.Context
- func NewSortGroup(parts Batch) (*SortGroup, Batch)
- func SerializeBytes(parts [][]byte) []byte
- type Batch
- type Part
- func (p *Part) AsBytes() []byte
- func (p *Part) AsStructured() (any, error)
- func (p *Part) AsStructuredMut() (any, error)
- func (p *Part) DeepCopy() *Part
- func (p *Part) ErrorGet() error
- func (p *Part) ErrorSet(err error)
- func (p *Part) GetContext() context.Context
- func (p *Part) HasBytes() bool
- func (p *Part) HasStructured() bool
- func (p *Part) IsEmpty() bool
- func (p *Part) MetaDelete(key string)
- func (p *Part) MetaGetMut(key string) (any, bool)
- func (p *Part) MetaGetStr(key string) string
- func (p *Part) MetaIterMut(f func(string, any) error) error
- func (p *Part) MetaIterStr(f func(string, string) error) error
- func (p *Part) MetaSetMut(key string, value any)
- func (p *Part) SetBytes(data []byte) *Part
- func (p *Part) SetStructured(jObj any)
- func (p *Part) SetStructuredMut(jObj any)
- func (p *Part) ShallowCopy() *Part
- func (p *Part) WithContext(ctx context.Context) *Part
- type SortGroup
- type Transaction
Constants ¶
This section is empty.
Variables ¶
var ( ErrMessagePartNotExist = errors.New("target message part does not exist") ErrBadMessageBytes = errors.New("serialised message bytes were in unexpected format") )
Errors returned by the message type.
Functions ¶
func CopyJSON ¶
CopyJSON recursively creates a deep copy of a JSON structure extracted from a message part.
func DeserializeBytes ¶
DeserializeBytes rebuilds a 2D byte array from a binary serialized blob.
func GetAllBytes ¶
GetAllBytes returns a 2D byte slice representing the raw byte content of the parts of a message.
func GetContext ¶
GetContext either returns a context attached to the message part, or context.Background() if one hasn't been previously attached.
func NewSortGroup ¶
NewSortGroup creates a sort group associated with a slice of parts.
func SerializeBytes ¶
SerializeBytes returns a 2D byte-slice serialized.
Types ¶
type Batch ¶
type Batch []*Part
Batch represents zero or more messages.
func QuickBatch ¶
QuickBatch initializes a new message batch from a 2D byte slice, the slice can be nil, in which case the batch will start empty.
func (Batch) DeepCopy ¶
DeepCopy creates a new deep copy of the message. This can be considered an entirely new object that is safe to use anywhere.
func (Batch) ShallowCopy ¶
ShallowCopy creates a new shallow copy of the message. Parts can be re-arranged in the new copy and JSON parts can be get/set without impacting other message copies.
type Part ¶
type Part struct {
// contains filtered or unexported fields
}
Part represents a single Benthos message.
func WithContext ¶
WithContext returns the same message part wrapped with a context, this context can subsequently be received with GetContext.
func (*Part) AsStructured ¶
AsStructured returns the structured format of the message if already set, or attempts to parse the raw bytes as a JSON document if not. The returned structure should be considered read-only and therefore not be mutated.
func (*Part) AsStructuredMut ¶
AsStructuredMut returns the structured format of the message if already set, or attempts to parse the raw bytes as a JSON document if not. The returned structure is mutable and therefore safe to mutate directly.
func (*Part) ErrorGet ¶
ErrorGet returns an error associated with the message, or nil if none exists.
func (*Part) ErrorSet ¶
ErrorSet modifies the error associated with a message. Errors attached to messages are used to indicate that processing has failed at some point in the processing pipeline.
func (*Part) GetContext ¶
GetContext returns the underlying context attached to this message part.
func (*Part) HasBytes ¶ added in v4.40.0
HasBytes returns true if the raw message bytes are readily available and cached.
func (*Part) HasStructured ¶ added in v4.40.0
HasStructured returns true if the structured message data is readily available and cached.
func (*Part) MetaDelete ¶
MetaDelete removes the value of a metadata key.
func (*Part) MetaGetMut ¶
MetaGetMut returns a metadata value if a key exists.
func (*Part) MetaGetStr ¶
MetaGetStr returns a metadata value if a key exists as a string, otherwise an empty string.
func (*Part) MetaIterMut ¶
MetaIterMut iterates each metadata key/value pair.
func (*Part) MetaIterStr ¶
MetaIterStr iterates each metadata key/value pair with the value serialised as a string.
func (*Part) MetaSetMut ¶
MetaSetMut sets the value of a metadata key to any value.
func (*Part) SetStructured ¶
SetStructured sets the value of the message to a structured value, this value is read-only and subsequent mutations will require cloning of the entire data structure.
func (*Part) SetStructuredMut ¶
SetStructuredMut sets the value of the message to a structured value, this value is mutable and subsequent mutations will be performed directly on the provided data.
func (*Part) ShallowCopy ¶
ShallowCopy creates a shallow copy of the message part.
type SortGroup ¶
type SortGroup struct {
Len int
}
SortGroup associates a tag of a part with the original group.
func TopLevelSortGroup ¶
TopLevelSortGroup returns the newest sort group to be associated with the given message part, or nil if there is none.
type Transaction ¶
type Transaction struct { // Payload is the message payload of this transaction. Payload Batch // contains filtered or unexported fields }
Transaction is a component that associates a batch of one or more messages with a mechanism that is able to propagate an acknowledgement of delivery back to the source of the batch.
This allows batches to be routed through complex component networks of buffers, processing pipelines and output brokers without losing the association.
It would not be sufficient to associate acknowledgement to the message (or batch of messages) itself as it would then not be possible to expand and split message batches (grouping, etc) without loosening delivery guarantees.
The proper way to do such things would be to create a new transaction for each resulting batch, and only when all derivative transactions are acknowledged is the source transaction acknowledged in turn.
func NewTransaction ¶
func NewTransaction(payload Batch, resChan chan<- error) Transaction
NewTransaction creates a new transaction object from a message payload and a response channel.
func NewTransactionFunc ¶
NewTransactionFunc creates a new transaction object that associates a message batch payload with a func used to acknowledge delivery of the message batch.
func (*Transaction) Ack ¶
func (t *Transaction) Ack(ctx context.Context, err error) error
Ack returns a delivery response back through the transaction to the message source. A nil error indicates that delivery has been completed successfully, a non-nil error indicates that the message could not be delivered and should be retried or nacked upstream.
func (*Transaction) Context ¶
func (t *Transaction) Context() context.Context
Context returns a context that indicates the cancellation of a transaction. It is optional for receivers of a transaction to honour this context, and is worth doing in cases where the transaction is blocked (on reconnect loops, etc) as it is often used as a fail-fast mechanism.
When a transaction is aborted due to cancellation it is still required that acknowledgment is made, and should be done so with t.Context().Err().
func (*Transaction) WithContext ¶
func (t *Transaction) WithContext(ctx context.Context) *Transaction
WithContext returns a copy of the transaction associated with a context used for cancellation. When cancelled it is up to the receiver of this transaction to abort any attempt to deliver the transaction message.