Documentation ¶
Overview ¶
Package batch provides tooling for creating and executing Benthos message batch policies.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SanitisePolicyConfig ¶
func SanitisePolicyConfig(policy PolicyConfig) (interface{}, error)
SanitisePolicyConfig returns a policy config structure ready to be marshalled with irrelevant fields omitted.
Types ¶
type Policy ¶
type Policy struct {
// contains filtered or unexported fields
}
Policy implements a batching policy by buffering messages until, based on a set of rules, the buffered messages are ready to be sent onwards as a batch.
func NewPolicy ¶
func NewPolicy( conf PolicyConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*Policy, error)
NewPolicy creates an empty policy with default rules.
func (*Policy) Add ¶
Add a new message part to this batch policy. Returns true if this part triggers the conditions of the policy.
func (*Policy) CloseAsync ¶ added in v3.10.0
func (p *Policy) CloseAsync()
CloseAsync shuts down the policy resources.
func (*Policy) Count ¶
Count returns the number of currently buffered message parts within this policy.
func (*Policy) Flush ¶
Flush clears all messages stored by this batch policy. Returns nil if the policy is currently empty.
func (*Policy) FlushAny ¶ added in v3.10.0
FlushAny clears all messages stored by this batch policy and returns any number of discrete message batches. Returns nil if the policy is currently empty.
type PolicyConfig ¶
type PolicyConfig struct { ByteSize int `json:"byte_size" yaml:"byte_size"` Count int `json:"count" yaml:"count"` Condition condition.Config `json:"condition" yaml:"condition"` Check string `json:"check" yaml:"check"` Period string `json:"period" yaml:"period"` Processors []processor.Config `json:"processors" yaml:"processors"` }
PolicyConfig contains configuration parameters for a batch policy.
func NewPolicyConfig ¶
func NewPolicyConfig() PolicyConfig
NewPolicyConfig creates a default PolicyConfig.
func (PolicyConfig) IsNoop ¶ added in v3.2.0
func (p PolicyConfig) IsNoop() bool
IsNoop returns true if this batch policy configuration does nothing.