batch

package
v3.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2019 License: MIT Imports: 8 Imported by: 6

Documentation

Overview

Package batch provides tooling for creating and executing Benthos message batch policies.

Index

Constants

This section is empty.

Variables

View Source
var PolicyDoc = `
Batches are considered complete and will be flushed downstream when either of
the following conditions are met:

- The ` + "`byte_size`" + ` field is non-zero and the total size of the batch in
  bytes matches or exceeds it (disregarding metadata.)
- The ` + "`count`" + ` field is non-zero and the total number of messages in
  the batch matches or exceeds it.
- A message added to the batch causes the condition to resolve ` + "`true`" + `.
- The ` + "`period`" + ` field is non-empty and the time since the last batch
  exceeds its value.`

PolicyDoc is a markdown document explaining the fields of a Benthos batch policy.

Functions

func SanitisePolicyConfig

func SanitisePolicyConfig(policy PolicyConfig) (interface{}, error)

SanitisePolicyConfig returns a policy config structure ready to be marshalled with irrelevant fields omitted.

Types

type Policy

type Policy struct {
	// contains filtered or unexported fields
}

Policy implements a batching policy by buffering messages until, based on a set of rules, the buffered messages are ready to be sent onwards as a batch.

func NewPolicy

func NewPolicy(
	conf PolicyConfig,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (*Policy, error)

NewPolicy creates an empty policy with default rules.

func (*Policy) Add

func (p *Policy) Add(part types.Part) bool

Add a new message part to this batch policy. Returns true if this part triggers the conditions of the policy.

func (*Policy) Count

func (p *Policy) Count() int

Count returns the number of currently buffered message parts within this policy.

func (*Policy) Flush

func (p *Policy) Flush() types.Message

Flush clears all messages stored by this batch policy. Returns nil if the policy is currently empty.

func (*Policy) UntilNext

func (p *Policy) UntilNext() time.Duration

UntilNext returns a duration indicating how long until the current batch should be flushed due to a configured period. A negative duration indicates a period has not been set.

type PolicyConfig

type PolicyConfig struct {
	ByteSize  int              `json:"byte_size" yaml:"byte_size"`
	Count     int              `json:"count" yaml:"count"`
	Condition condition.Config `json:"condition" yaml:"condition"`
	Period    string           `json:"period" yaml:"period"`
}

PolicyConfig contains configuration parameters for a batch policy.

func NewPolicyConfig

func NewPolicyConfig() PolicyConfig

NewPolicyConfig creates a default PolicyConfig.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL