batch

package
v3.65.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package batch provides tooling for creating and executing Benthos message batch policies.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FieldSpec

func FieldSpec() docs.FieldSpec

FieldSpec returns a spec for a common batching field.

func SanitisePolicyConfig

func SanitisePolicyConfig(policy PolicyConfig) (interface{}, error)

SanitisePolicyConfig returns a policy config structure ready to be marshalled with irrelevant fields omitted.

Types

type Policy

type Policy struct {
	// contains filtered or unexported fields
}

Policy implements a batching policy by buffering messages until, based on a set of rules, the buffered messages are ready to be sent onwards as a batch.

func NewPolicy

func NewPolicy(
	conf PolicyConfig,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (*Policy, error)

NewPolicy creates an empty policy with default rules.

func (*Policy) Add

func (p *Policy) Add(part types.Part) bool

Add a new message part to this batch policy. Returns true if this part triggers the conditions of the policy.

func (*Policy) CloseAsync

func (p *Policy) CloseAsync()

CloseAsync shuts down the policy resources.

func (*Policy) Count

func (p *Policy) Count() int

Count returns the number of currently buffered message parts within this policy.

func (*Policy) Flush

func (p *Policy) Flush() types.Message

Flush clears all messages stored by this batch policy. Returns nil if the policy is currently empty.

func (*Policy) FlushAny

func (p *Policy) FlushAny() []types.Message

FlushAny clears all messages stored by this batch policy and returns any number of discrete message batches. Returns nil if the policy is currently empty.

func (*Policy) UntilNext

func (p *Policy) UntilNext() time.Duration

UntilNext returns a duration indicating how long until the current batch should be flushed due to a configured period. A negative duration indicates a period has not been set.

func (*Policy) WaitForClose

func (p *Policy) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the processor has closed down.

type PolicyConfig

type PolicyConfig struct {
	ByteSize   int                `json:"byte_size" yaml:"byte_size"`
	Count      int                `json:"count" yaml:"count"`
	Condition  condition.Config   `json:"condition" yaml:"condition"`
	Check      string             `json:"check" yaml:"check"`
	Period     string             `json:"period" yaml:"period"`
	Processors []processor.Config `json:"processors" yaml:"processors"`
}

PolicyConfig contains configuration parameters for a batch policy.

func NewPolicyConfig

func NewPolicyConfig() PolicyConfig

NewPolicyConfig creates a default PolicyConfig.

func (PolicyConfig) IsNoop

func (p PolicyConfig) IsNoop() bool

IsNoop returns true if this batch policy configuration does nothing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL