output

package
v4.21.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2023 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var InjectTracingSpanMappingDocs = docs.FieldBloblang(
	"inject_tracing_map",
	"EXPERIMENTAL: A [Bloblang mapping](/docs/guides/bloblang/about) used to inject an object containing tracing propagation information into outbound messages. The specification of the injected fields will match the format used by the service wide tracer.",
	`meta = meta().merge(this)`,
	`root.meta.span = this`,
).AtVersion("3.45.0").Advanced()

InjectTracingSpanMappingDocs returns a field spec describing an inject tracing span mapping.

Functions

func Description

func Description(async, batches bool, content string) string

Description appends standard feature descriptions to an output description based on various features of the output.

func IterateBatchedSend added in v4.1.0

func IterateBatchedSend(msg message.Batch, fn func(int, *message.Part) error) error

IterateBatchedSend executes a closure fn on each message of a batch, where the closure is expected to attempt a send and return an error. If an error is returned then it is added to a batch error in order to support index specific error handling.

However, if a fatal error is returned such as a connection loss or shut down then it is returned immediately.

Types

type AsyncSink added in v4.1.0

type AsyncSink interface {
	// Connect attempts to establish a connection to the sink, if
	// unsuccessful returns an error. If the attempt is successful (or not
	// necessary) returns nil.
	Connect(ctx context.Context) error

	// WriteBatch should block until either the message is sent (and
	// acknowledged) to a sink, or a transport specific error has occurred, or
	// the Type is closed.
	WriteBatch(ctx context.Context, msg message.Batch) error

	// Close is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	Close(ctx context.Context) error
}

AsyncSink is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.

type AsyncWriter added in v4.1.0

type AsyncWriter struct {
	// contains filtered or unexported fields
}

AsyncWriter is an output type that writes messages to a writer.Type.

func (*AsyncWriter) Connected added in v4.1.0

func (w *AsyncWriter) Connected() bool

Connected returns a boolean indicating whether this output is currently connected to its target.

func (*AsyncWriter) Consume added in v4.1.0

func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error

Consume assigns a messages channel for the output to read.

func (*AsyncWriter) SetInjectTracingMap added in v4.1.0

func (w *AsyncWriter) SetInjectTracingMap(exec *mapping.Executor)

SetInjectTracingMap sets a mapping to be used for injecting tracing events into messages.

func (*AsyncWriter) TriggerCloseNow added in v4.6.0

func (w *AsyncWriter) TriggerCloseNow()

TriggerCloseNow shuts down the output and stops processing messages.

func (*AsyncWriter) WaitForClose added in v4.1.0

func (w *AsyncWriter) WaitForClose(ctx context.Context) error

WaitForClose blocks until the File output has closed down.

type BrokerConfig added in v4.1.0

type BrokerConfig struct {
	Copies   int                `json:"copies" yaml:"copies"`
	Pattern  string             `json:"pattern" yaml:"pattern"`
	Outputs  []Config           `json:"outputs" yaml:"outputs"`
	Batching batchconfig.Config `json:"batching" yaml:"batching"`
}

BrokerConfig contains configuration fields for the Broker output type.

func NewBrokerConfig added in v4.1.0

func NewBrokerConfig() BrokerConfig

NewBrokerConfig creates a new BrokerConfig with default values.

type CacheConfig added in v4.1.0

type CacheConfig struct {
	Target      string `json:"target" yaml:"target"`
	Key         string `json:"key" yaml:"key"`
	TTL         string `json:"ttl" yaml:"ttl"`
	MaxInFlight int    `json:"max_in_flight" yaml:"max_in_flight"`
}

CacheConfig contains configuration fields for the Cache output type.

func NewCacheConfig added in v4.1.0

func NewCacheConfig() CacheConfig

NewCacheConfig creates a new Config with default values.

type CassandraConfig added in v4.1.0

type CassandraConfig struct {
	Addresses                []string              `json:"addresses" yaml:"addresses"`
	TLS                      btls.Config           `json:"tls" yaml:"tls"`
	PasswordAuthenticator    PasswordAuthenticator `json:"password_authenticator" yaml:"password_authenticator"`
	DisableInitialHostLookup bool                  `json:"disable_initial_host_lookup" yaml:"disable_initial_host_lookup"`
	Query                    string                `json:"query" yaml:"query"`
	ArgsMapping              string                `json:"args_mapping" yaml:"args_mapping"`
	Consistency              string                `json:"consistency" yaml:"consistency"`
	Timeout                  string                `json:"timeout" yaml:"timeout"`
	LoggedBatch              bool                  `json:"logged_batch" yaml:"logged_batch"`
	// TODO: V4 Remove this and replace with explicit values.
	retries.Config `json:",inline" yaml:",inline"`
	MaxInFlight    int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching       batchconfig.Config `json:"batching" yaml:"batching"`
}

CassandraConfig contains configuration fields for the Cassandra output type.

func NewCassandraConfig added in v4.1.0

func NewCassandraConfig() CassandraConfig

NewCassandraConfig creates a new CassandraConfig with default values.

type Config added in v4.1.0

type Config struct {
	Label        string             `json:"label" yaml:"label"`
	Type         string             `json:"type" yaml:"type"`
	Broker       BrokerConfig       `json:"broker" yaml:"broker"`
	Cache        CacheConfig        `json:"cache" yaml:"cache"`
	Cassandra    CassandraConfig    `json:"cassandra" yaml:"cassandra"`
	Drop         DropConfig         `json:"drop" yaml:"drop"`
	DropOn       DropOnConfig       `json:"drop_on" yaml:"drop_on"`
	Dynamic      DynamicConfig      `json:"dynamic" yaml:"dynamic"`
	Fallback     TryConfig          `json:"fallback" yaml:"fallback"`
	Inproc       string             `json:"inproc" yaml:"inproc"`
	Kafka        KafkaConfig        `json:"kafka" yaml:"kafka"`
	MQTT         MQTTConfig         `json:"mqtt" yaml:"mqtt"`
	Nanomsg      NanomsgConfig      `json:"nanomsg" yaml:"nanomsg"`
	NSQ          NSQConfig          `json:"nsq" yaml:"nsq"`
	Plugin       any                `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	Reject       string             `json:"reject" yaml:"reject"`
	Resource     string             `json:"resource" yaml:"resource"`
	Retry        RetryConfig        `json:"retry" yaml:"retry"`
	SFTP         SFTPConfig         `json:"sftp" yaml:"sftp"`
	STDOUT       STDOUTConfig       `json:"stdout" yaml:"stdout"`
	Subprocess   SubprocessConfig   `json:"subprocess" yaml:"subprocess"`
	Switch       SwitchConfig       `json:"switch" yaml:"switch"`
	SyncResponse struct{}           `json:"sync_response" yaml:"sync_response"`
	Socket       SocketConfig       `json:"socket" yaml:"socket"`
	Processors   []processor.Config `json:"processors" yaml:"processors"`
}

Config is the all encompassing configuration struct for all output types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func NewConfig added in v4.1.0

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func (*Config) UnmarshalYAML added in v4.1.0

func (conf *Config) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type DropConfig added in v4.1.0

type DropConfig struct{}

DropConfig contains configuration fields for the drop output type.

func NewDropConfig added in v4.1.0

func NewDropConfig() DropConfig

NewDropConfig creates a new DropConfig with default values.

type DropOnConditions added in v4.1.0

type DropOnConditions struct {
	Error        bool   `json:"error" yaml:"error"`
	BackPressure string `json:"back_pressure" yaml:"back_pressure"`
}

DropOnConditions is a config struct representing the different circumstances under which messages should be dropped.

type DropOnConfig added in v4.1.0

type DropOnConfig struct {
	DropOnConditions `json:",inline" yaml:",inline"`
	Output           *Config `json:"output" yaml:"output"`
}

DropOnConfig contains configuration values for the DropOn output type.

func NewDropOnConfig added in v4.1.0

func NewDropOnConfig() DropOnConfig

NewDropOnConfig creates a new DropOnConfig with default values.

func (DropOnConfig) MarshalJSON added in v4.1.0

func (d DropOnConfig) MarshalJSON() ([]byte, error)

MarshalJSON prints an empty object instead of nil.

func (DropOnConfig) MarshalYAML added in v4.1.0

func (d DropOnConfig) MarshalYAML() (any, error)

MarshalYAML prints an empty object instead of nil.

type DynamicConfig added in v4.1.0

type DynamicConfig struct {
	Outputs map[string]Config `json:"outputs" yaml:"outputs"`
	Prefix  string            `json:"prefix" yaml:"prefix"`
}

DynamicConfig contains configuration fields for the Dynamic output type.

func NewDynamicConfig added in v4.1.0

func NewDynamicConfig() DynamicConfig

NewDynamicConfig creates a new DynamicConfig with default values.

type KafkaConfig added in v4.1.0

type KafkaConfig struct {
	Addresses        []string    `json:"addresses" yaml:"addresses"`
	ClientID         string      `json:"client_id" yaml:"client_id"`
	RackID           string      `json:"rack_id" yaml:"rack_id"`
	Key              string      `json:"key" yaml:"key"`
	Partitioner      string      `json:"partitioner" yaml:"partitioner"`
	Partition        string      `json:"partition" yaml:"partition"`
	Topic            string      `json:"topic" yaml:"topic"`
	Compression      string      `json:"compression" yaml:"compression"`
	MaxMsgBytes      int         `json:"max_msg_bytes" yaml:"max_msg_bytes"`
	Timeout          string      `json:"timeout" yaml:"timeout"`
	AckReplicas      bool        `json:"ack_replicas" yaml:"ack_replicas"`
	TargetVersion    string      `json:"target_version" yaml:"target_version"`
	TLS              btls.Config `json:"tls" yaml:"tls"`
	SASL             sasl.Config `json:"sasl" yaml:"sasl"`
	MaxInFlight      int         `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config   `json:",inline" yaml:",inline"`
	RetryAsBatch     bool                         `json:"retry_as_batch" yaml:"retry_as_batch"`
	Batching         batchconfig.Config           `json:"batching" yaml:"batching"`
	StaticHeaders    map[string]string            `json:"static_headers" yaml:"static_headers"`
	Metadata         metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	InjectTracingMap string                       `json:"inject_tracing_map" yaml:"inject_tracing_map"`
}

KafkaConfig contains configuration fields for the Kafka output type.

func NewKafkaConfig added in v4.1.0

func NewKafkaConfig() KafkaConfig

NewKafkaConfig creates a new KafkaConfig with default values.

type MQTTConfig added in v4.1.0

type MQTTConfig struct {
	URLs                  []string      `json:"urls" yaml:"urls"`
	QoS                   uint8         `json:"qos" yaml:"qos"`
	Retained              bool          `json:"retained" yaml:"retained"`
	RetainedInterpolated  string        `json:"retained_interpolated" yaml:"retained_interpolated"`
	Topic                 string        `json:"topic" yaml:"topic"`
	ClientID              string        `json:"client_id" yaml:"client_id"`
	DynamicClientIDSuffix string        `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"`
	Will                  mqttconf.Will `json:"will" yaml:"will"`
	User                  string        `json:"user" yaml:"user"`
	Password              string        `json:"password" yaml:"password"`
	ConnectTimeout        string        `json:"connect_timeout" yaml:"connect_timeout"`
	WriteTimeout          string        `json:"write_timeout" yaml:"write_timeout"`
	KeepAlive             int64         `json:"keepalive" yaml:"keepalive"`
	MaxInFlight           int           `json:"max_in_flight" yaml:"max_in_flight"`
	TLS                   tls.Config    `json:"tls" yaml:"tls"`
}

MQTTConfig contains configuration fields for the MQTT output type.

func NewMQTTConfig added in v4.1.0

func NewMQTTConfig() MQTTConfig

NewMQTTConfig creates a new MQTTConfig with default values.

type NSQConfig added in v4.1.0

type NSQConfig struct {
	Address     string      `json:"nsqd_tcp_address" yaml:"nsqd_tcp_address"`
	Topic       string      `json:"topic" yaml:"topic"`
	UserAgent   string      `json:"user_agent" yaml:"user_agent"`
	TLS         btls.Config `json:"tls" yaml:"tls"`
	MaxInFlight int         `json:"max_in_flight" yaml:"max_in_flight"`
}

NSQConfig contains configuration fields for the NSQ output type.

func NewNSQConfig added in v4.1.0

func NewNSQConfig() NSQConfig

NewNSQConfig creates a new NSQConfig with default values.

type NanomsgConfig added in v4.1.0

type NanomsgConfig struct {
	URLs        []string `json:"urls" yaml:"urls"`
	Bind        bool     `json:"bind" yaml:"bind"`
	SocketType  string   `json:"socket_type" yaml:"socket_type"`
	PollTimeout string   `json:"poll_timeout" yaml:"poll_timeout"`
	MaxInFlight int      `json:"max_in_flight" yaml:"max_in_flight"`
}

NanomsgConfig contains configuration fields for the Nanomsg output type.

func NewNanomsgConfig added in v4.1.0

func NewNanomsgConfig() NanomsgConfig

NewNanomsgConfig creates a new NanomsgConfig with default values.

type PasswordAuthenticator added in v4.1.0

type PasswordAuthenticator struct {
	Enabled  bool   `json:"enabled" yaml:"enabled"`
	Username string `json:"username" yaml:"username"`
	Password string `json:"password" yaml:"password"`
}

PasswordAuthenticator contains the fields that will be used to authenticate with the Cassandra cluster.

type RetryConfig added in v4.1.0

type RetryConfig struct {
	Output         *Config `json:"output" yaml:"output"`
	retries.Config `json:",inline" yaml:",inline"`
}

RetryConfig contains configuration values for the Retry output type.

func NewRetryConfig added in v4.1.0

func NewRetryConfig() RetryConfig

NewRetryConfig creates a new RetryConfig with default values.

func (RetryConfig) MarshalJSON added in v4.1.0

func (r RetryConfig) MarshalJSON() ([]byte, error)

MarshalJSON prints an empty object instead of nil.

func (RetryConfig) MarshalYAML added in v4.1.0

func (r RetryConfig) MarshalYAML() (any, error)

MarshalYAML prints an empty object instead of nil.

type SFTPConfig added in v4.1.0

type SFTPConfig struct {
	Address     string                `json:"address" yaml:"address"`
	Path        string                `json:"path" yaml:"path"`
	Codec       string                `json:"codec" yaml:"codec"`
	Credentials sftpSetup.Credentials `json:"credentials" yaml:"credentials"`
	MaxInFlight int                   `json:"max_in_flight" yaml:"max_in_flight"`
}

SFTPConfig contains configuration fields for the SFTP output type.

func NewSFTPConfig added in v4.1.0

func NewSFTPConfig() SFTPConfig

NewSFTPConfig creates a new Config with default values.

type STDOUTConfig added in v4.1.0

type STDOUTConfig struct {
	Codec string `json:"codec" yaml:"codec"`
}

STDOUTConfig contains configuration fields for the stdout based output type.

func NewSTDOUTConfig added in v4.1.0

func NewSTDOUTConfig() STDOUTConfig

NewSTDOUTConfig creates a new STDOUTConfig with default values.

type SocketConfig added in v4.1.0

type SocketConfig struct {
	Network string `json:"network" yaml:"network"`
	Address string `json:"address" yaml:"address"`
	Codec   string `json:"codec" yaml:"codec"`
}

SocketConfig contains configuration fields for the Socket output type.

func NewSocketConfig added in v4.1.0

func NewSocketConfig() SocketConfig

NewSocketConfig creates a new SocketConfig with default values.

type Streamed

type Streamed interface {
	// Consume starts the type receiving transactions from a Transactor.
	Consume(<-chan message.Transaction) error

	// Connected returns a boolean indicating whether this output is currently
	// connected to its target.
	Connected() bool

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Streamed is a common interface implemented by outputs and provides channel based streaming APIs.

func NewAsyncWriter added in v4.1.0

func NewAsyncWriter(typeStr string, maxInflight int, w AsyncSink, mgr component.Observability) (Streamed, error)

NewAsyncWriter creates a Streamed implementation around an AsyncSink.

func OnlySinglePayloads added in v4.1.0

func OnlySinglePayloads(out Streamed) Streamed

OnlySinglePayloads expands message batches into individual payloads, respecting the max in flight of the wrapped output. This is a more efficient way of feeding messages into an output that handles its own batching mechanism internally, or does not support batching at all.

func WrapWithPipelines added in v4.1.0

func WrapWithPipelines(out Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)

WrapWithPipelines wraps an output with a variadic number of pipelines.

type SubprocessConfig added in v4.1.0

type SubprocessConfig struct {
	Name  string   `json:"name" yaml:"name"`
	Args  []string `json:"args" yaml:"args"`
	Codec string   `json:"codec" yaml:"codec"`
}

SubprocessConfig contains configuration for the Subprocess input type.

func NewSubprocessConfig added in v4.1.0

func NewSubprocessConfig() SubprocessConfig

NewSubprocessConfig creates a new SubprocessConfig with default values.

type SwitchConfig added in v4.1.0

type SwitchConfig struct {
	RetryUntilSuccess bool               `json:"retry_until_success" yaml:"retry_until_success"`
	StrictMode        bool               `json:"strict_mode" yaml:"strict_mode"`
	Cases             []SwitchConfigCase `json:"cases" yaml:"cases"`
}

SwitchConfig contains configuration fields for the switchOutput output type.

func NewSwitchConfig added in v4.1.0

func NewSwitchConfig() SwitchConfig

NewSwitchConfig creates a new SwitchConfig with default values.

type SwitchConfigCase added in v4.1.0

type SwitchConfigCase struct {
	Check    string `json:"check" yaml:"check"`
	Continue bool   `json:"continue" yaml:"continue"`
	Output   Config `json:"output" yaml:"output"`
}

SwitchConfigCase contains configuration fields per output of a switch type.

func NewSwitchConfigCase added in v4.1.0

func NewSwitchConfigCase() SwitchConfigCase

NewSwitchConfigCase creates a new switch output config with default values.

type Sync

type Sync interface {
	// WriteTransaction attempts to write a transaction to an output.
	WriteTransaction(context.Context, message.Transaction) error

	// Connected returns a boolean indicating whether this output is currently
	// connected to its target.
	Connected() bool

	// TriggerStopConsuming instructs the output to start shutting down
	// resources once all pending messages are delivered and acknowledged.
	TriggerStopConsuming()

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Sync is a common interface implemented by outputs and provides synchronous based writing APIs.

type TryConfig added in v4.1.0

type TryConfig []Config

TryConfig contains configuration fields for the Try output type.

func NewTryConfig added in v4.1.0

func NewTryConfig() TryConfig

NewTryConfig creates a new BrokerConfig with default values.

type WithPipeline added in v4.1.0

type WithPipeline struct {
	// contains filtered or unexported fields
}

WithPipeline is a type that wraps both an output type and a pipeline type by routing the pipeline through the output, and implements the output.Type interface in order to act like an ordinary output.

func WrapWithPipeline added in v4.1.0

func WrapWithPipeline(out Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)

WrapWithPipeline routes a processing pipeline directly into an output and returns a type that manages both and acts like an ordinary output.

func (*WithPipeline) Connected added in v4.1.0

func (i *WithPipeline) Connected() bool

Connected returns a boolean indicating whether this output is currently connected to its target.

func (*WithPipeline) Consume added in v4.1.0

func (i *WithPipeline) Consume(tsChan <-chan message.Transaction) error

Consume starts the type listening to a message channel from a producer.

func (*WithPipeline) TriggerCloseNow added in v4.6.0

func (i *WithPipeline) TriggerCloseNow()

TriggerCloseNow triggers a closure of this object but does not block.

func (*WithPipeline) WaitForClose added in v4.1.0

func (i *WithPipeline) WaitForClose(ctx context.Context) error

WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL