Documentation ¶
Index ¶
- Variables
- func CopyJSON(root interface{}) (interface{}, error)
- func GetAllBytes(m *Batch) [][]byte
- func GetAllBytesLen(m *Batch) int
- func GetContext(p *Part) context.Context
- func NewSortGroup(m *Batch) (*SortGroup, *Batch)
- func NewSortGroupParts(parts []*Part) (*SortGroup, []*Part)
- func SetAllMetadata(m *Batch, meta map[string]string)
- func ToBytes(m *Batch) []byte
- type Batch
- type Part
- func (p *Part) Copy() *Part
- func (p *Part) DeepCopy() *Part
- func (p *Part) ErrorGet() error
- func (p *Part) ErrorSet(err error)
- func (p *Part) Get() []byte
- func (p *Part) GetContext() context.Context
- func (p *Part) IsEmpty() bool
- func (p *Part) JSON() (interface{}, error)
- func (p *Part) MetaDelete(key string)
- func (p *Part) MetaGet(key string) string
- func (p *Part) MetaIter(f func(k, v string) error) error
- func (p *Part) MetaSet(key, value string)
- func (p *Part) Set(data []byte) *Part
- func (p *Part) SetJSON(jObj interface{})
- func (p *Part) WithContext(ctx context.Context) *Part
- type SortGroup
- type Transaction
Constants ¶
This section is empty.
Variables ¶
var ( ErrMessagePartNotExist = errors.New("target message part does not exist") ErrBadMessageBytes = errors.New("serialised message bytes were in unexpected format") ErrBlockCorrupted = errors.New("serialised messages block was in unexpected format") )
Errors returned by the message type.
Functions ¶
func CopyJSON ¶
func CopyJSON(root interface{}) (interface{}, error)
CopyJSON recursively creates a deep copy of a JSON structure extracted from a message part.
func GetAllBytes ¶
GetAllBytes returns a 2D byte slice representing the raw byte content of the parts of a message.
func GetAllBytesLen ¶
GetAllBytesLen returns total length of message content in bytes
func GetContext ¶
GetContext either returns a context attached to the message part, or context.Background() if one hasn't been previously attached.
func NewSortGroup ¶
NewSortGroup creates a new sort group to be associated with a
func NewSortGroupParts ¶
NewSortGroupParts creates a sort group associated with a slice of parts.
func SetAllMetadata ¶
SetAllMetadata sets the metadata of all message parts to match a provided metadata implementation.
Types ¶
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch represents zero or more messages.
func QuickBatch ¶
QuickBatch initializes a new message batch from a 2D byte slice, the slice can be nil, in which case the batch will start empty.
func (*Batch) Copy ¶
Copy creates a new shallow copy of the message. Parts can be re-arranged in the new copy and JSON parts can be get/set without impacting other message copies. However, it is still unsafe to edit the raw content of message parts.
func (*Batch) DeepCopy ¶
DeepCopy creates a new deep copy of the message. This can be considered an entirely new object that is safe to use anywhere.
type Part ¶
type Part struct {
// contains filtered or unexported fields
}
Part represents a single Benthos message.
func WithContext ¶
WithContext returns the same message part wrapped with a context, this context can subsequently be received with GetContext.
func (*Part) ErrorGet ¶
ErrorGet returns an error associated with the message, or nil if none exists.
func (*Part) ErrorSet ¶
ErrorSet modifies the error associated with a message. Errors attached to messages are used to indicate that processing has failed at some point in the processing pipeline.
func (*Part) GetContext ¶
GetContext returns the underlying context attached to this message part.
func (*Part) JSON ¶
JSON attempts to parse the message part as a JSON document and returns the result.
func (*Part) MetaDelete ¶
MetaDelete removes the value of a metadata key.
type SortGroup ¶
type SortGroup struct {
Len int
}
SortGroup associates a tag of a part with the original group.
type Transaction ¶
type Transaction struct { // Payload is the message payload of this transaction. Payload *Batch // ResponseChan should receive a response at the end of a transaction (once // the message is no longer owned by the receiver.) The response itself // indicates whether the message has been propagated successfully. ResponseChan chan<- error // contains filtered or unexported fields }
Transaction is a component that associates a batch of one or more messages with a mechanism that is able to propagate an acknowledgement of delivery back to the source of the batch.
This allows batches to be routed through complex component networks of buffers, processing pipelines and output brokers without losing the association.
It would not be sufficient to associate acknowledgement to the message (or batch of messages) itself as it would then not be possible to expand and split message batches (grouping, etc) without loosening delivery guarantees.
The proper way to do such things would be to create a new transaction for each resulting batch, and only when all derivative transactions are acknowledged is the source transaction acknowledged in turn.
func NewTransaction ¶
func NewTransaction(payload *Batch, resChan chan<- error) Transaction
NewTransaction creates a new transaction object from a message payload and a response channel.
func NewTransactionFunc ¶
NewTransactionFunc creates a new transaction object that associates a message batch payload with a func used to acknowledge delivery of the message batch.
func (*Transaction) Ack ¶
func (t *Transaction) Ack(ctx context.Context, err error) error
Ack returns a delivery response back through the transaction to the message source. A nil error indicates that delivery has been completed successfully, a non-nil error indicates that the message could not be delivered and should be retried or nacked upstream.