Documentation ¶
Index ¶
- func Description(async, batches bool, content string) string
- func IterateBatchedSend(msg message.Batch, fn func(int, *message.Part) error) error
- type AsyncSink
- type AsyncWriter
- type BrokerConfig
- type CacheConfig
- type CassandraConfig
- type Config
- type DropConfig
- type DropOnConditions
- type DropOnConfig
- type DynamicConfig
- type MQTTConfig
- type NSQConfig
- type NanomsgConfig
- type PasswordAuthenticator
- type RetryConfig
- type SFTPConfig
- type STDOUTConfig
- type SocketConfig
- type Streamed
- type SubprocessConfig
- type SwitchConfig
- type SwitchConfigCase
- type Sync
- type TryConfig
- type WithPipeline
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Description ¶
Description appends standard feature descriptions to an output description based on various features of the output.
func IterateBatchedSend ¶
IterateBatchedSend executes a closure fn on each message of a batch, where the closure is expected to attempt a send and return an error. If an error is returned then it is added to a batch error in order to support index specific error handling.
However, if a fatal error is returned such as a connection loss or shut down then it is returned immediately.
Types ¶
type AsyncSink ¶
type AsyncSink interface { // Connect attempts to establish a connection to the sink, if // unsuccessful returns an error. If the attempt is successful (or not // necessary) returns nil. Connect(ctx context.Context) error // WriteBatch should block until either the message is sent (and // acknowledged) to a sink, or a transport specific error has occurred, or // the Type is closed. WriteBatch(ctx context.Context, msg message.Batch) error // Close is a blocking call to wait until the component has finished // shutting down and cleaning up resources. Close(ctx context.Context) error }
AsyncSink is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.
type AsyncWriter ¶
type AsyncWriter struct {
// contains filtered or unexported fields
}
AsyncWriter is an output type that writes messages to a writer.Type.
func (*AsyncWriter) Connected ¶
func (w *AsyncWriter) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*AsyncWriter) Consume ¶
func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error
Consume assigns a messages channel for the output to read.
func (*AsyncWriter) TriggerCloseNow ¶
func (w *AsyncWriter) TriggerCloseNow()
TriggerCloseNow shuts down the output and stops processing messages.
func (*AsyncWriter) WaitForClose ¶
func (w *AsyncWriter) WaitForClose(ctx context.Context) error
WaitForClose blocks until the File output has closed down.
type BrokerConfig ¶
type BrokerConfig struct { Copies int `json:"copies" yaml:"copies"` Pattern string `json:"pattern" yaml:"pattern"` Outputs []Config `json:"outputs" yaml:"outputs"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
BrokerConfig contains configuration fields for the Broker output type.
func NewBrokerConfig ¶
func NewBrokerConfig() BrokerConfig
NewBrokerConfig creates a new BrokerConfig with default values.
type CacheConfig ¶
type CacheConfig struct { Target string `json:"target" yaml:"target"` Key string `json:"key" yaml:"key"` TTL string `json:"ttl" yaml:"ttl"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
CacheConfig contains configuration fields for the Cache output type.
func NewCacheConfig ¶
func NewCacheConfig() CacheConfig
NewCacheConfig creates a new Config with default values.
type CassandraConfig ¶
type CassandraConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` TLS btls.Config `json:"tls" yaml:"tls"` PasswordAuthenticator PasswordAuthenticator `json:"password_authenticator" yaml:"password_authenticator"` DisableInitialHostLookup bool `json:"disable_initial_host_lookup" yaml:"disable_initial_host_lookup"` Query string `json:"query" yaml:"query"` ArgsMapping string `json:"args_mapping" yaml:"args_mapping"` Consistency string `json:"consistency" yaml:"consistency"` Timeout string `json:"timeout" yaml:"timeout"` LoggedBatch bool `json:"logged_batch" yaml:"logged_batch"` // TODO: V4 Remove this and replace with explicit values. retries.Config `json:",inline" yaml:",inline"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
CassandraConfig contains configuration fields for the Cassandra output type.
func NewCassandraConfig ¶
func NewCassandraConfig() CassandraConfig
NewCassandraConfig creates a new CassandraConfig with default values.
type Config ¶
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` Broker BrokerConfig `json:"broker" yaml:"broker"` Cache CacheConfig `json:"cache" yaml:"cache"` Cassandra CassandraConfig `json:"cassandra" yaml:"cassandra"` Drop DropConfig `json:"drop" yaml:"drop"` DropOn DropOnConfig `json:"drop_on" yaml:"drop_on"` Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"` Fallback TryConfig `json:"fallback" yaml:"fallback"` Inproc string `json:"inproc" yaml:"inproc"` MQTT MQTTConfig `json:"mqtt" yaml:"mqtt"` Nanomsg NanomsgConfig `json:"nanomsg" yaml:"nanomsg"` NSQ NSQConfig `json:"nsq" yaml:"nsq"` Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"` Reject string `json:"reject" yaml:"reject"` Resource string `json:"resource" yaml:"resource"` Retry RetryConfig `json:"retry" yaml:"retry"` SFTP SFTPConfig `json:"sftp" yaml:"sftp"` STDOUT STDOUTConfig `json:"stdout" yaml:"stdout"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Switch SwitchConfig `json:"switch" yaml:"switch"` SyncResponse struct{} `json:"sync_response" yaml:"sync_response"` Socket SocketConfig `json:"socket" yaml:"socket"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is the all encompassing configuration struct for all output types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
func NewConfig ¶
func NewConfig() Config
NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
func (*Config) UnmarshalYAML ¶
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type DropConfig ¶
type DropConfig struct{}
DropConfig contains configuration fields for the drop output type.
func NewDropConfig ¶
func NewDropConfig() DropConfig
NewDropConfig creates a new DropConfig with default values.
type DropOnConditions ¶
type DropOnConditions struct { Error bool `json:"error" yaml:"error"` BackPressure string `json:"back_pressure" yaml:"back_pressure"` }
DropOnConditions is a config struct representing the different circumstances under which messages should be dropped.
type DropOnConfig ¶
type DropOnConfig struct { DropOnConditions `json:",inline" yaml:",inline"` Output *Config `json:"output" yaml:"output"` }
DropOnConfig contains configuration values for the DropOn output type.
func NewDropOnConfig ¶
func NewDropOnConfig() DropOnConfig
NewDropOnConfig creates a new DropOnConfig with default values.
func (DropOnConfig) MarshalJSON ¶
func (d DropOnConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (DropOnConfig) MarshalYAML ¶
func (d DropOnConfig) MarshalYAML() (any, error)
MarshalYAML prints an empty object instead of nil.
type DynamicConfig ¶
type DynamicConfig struct { Outputs map[string]Config `json:"outputs" yaml:"outputs"` Prefix string `json:"prefix" yaml:"prefix"` }
DynamicConfig contains configuration fields for the Dynamic output type.
func NewDynamicConfig ¶
func NewDynamicConfig() DynamicConfig
NewDynamicConfig creates a new DynamicConfig with default values.
type MQTTConfig ¶
type MQTTConfig struct { URLs []string `json:"urls" yaml:"urls"` QoS uint8 `json:"qos" yaml:"qos"` Retained bool `json:"retained" yaml:"retained"` RetainedInterpolated string `json:"retained_interpolated" yaml:"retained_interpolated"` Topic string `json:"topic" yaml:"topic"` ClientID string `json:"client_id" yaml:"client_id"` DynamicClientIDSuffix string `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"` Will mqttconf.Will `json:"will" yaml:"will"` User string `json:"user" yaml:"user"` Password string `json:"password" yaml:"password"` ConnectTimeout string `json:"connect_timeout" yaml:"connect_timeout"` WriteTimeout string `json:"write_timeout" yaml:"write_timeout"` KeepAlive int64 `json:"keepalive" yaml:"keepalive"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` TLS tls.Config `json:"tls" yaml:"tls"` }
MQTTConfig contains configuration fields for the MQTT output type.
func NewMQTTConfig ¶
func NewMQTTConfig() MQTTConfig
NewMQTTConfig creates a new MQTTConfig with default values.
type NSQConfig ¶
type NSQConfig struct { Address string `json:"nsqd_tcp_address" yaml:"nsqd_tcp_address"` Topic string `json:"topic" yaml:"topic"` UserAgent string `json:"user_agent" yaml:"user_agent"` TLS btls.Config `json:"tls" yaml:"tls"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
NSQConfig contains configuration fields for the NSQ output type.
func NewNSQConfig ¶
func NewNSQConfig() NSQConfig
NewNSQConfig creates a new NSQConfig with default values.
type NanomsgConfig ¶
type NanomsgConfig struct { URLs []string `json:"urls" yaml:"urls"` Bind bool `json:"bind" yaml:"bind"` SocketType string `json:"socket_type" yaml:"socket_type"` PollTimeout string `json:"poll_timeout" yaml:"poll_timeout"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
NanomsgConfig contains configuration fields for the Nanomsg output type.
func NewNanomsgConfig ¶
func NewNanomsgConfig() NanomsgConfig
NewNanomsgConfig creates a new NanomsgConfig with default values.
type PasswordAuthenticator ¶
type PasswordAuthenticator struct { Enabled bool `json:"enabled" yaml:"enabled"` Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` }
PasswordAuthenticator contains the fields that will be used to authenticate with the Cassandra cluster.
type RetryConfig ¶
type RetryConfig struct { Output *Config `json:"output" yaml:"output"` retries.Config `json:",inline" yaml:",inline"` }
RetryConfig contains configuration values for the Retry output type.
func NewRetryConfig ¶
func NewRetryConfig() RetryConfig
NewRetryConfig creates a new RetryConfig with default values.
func (RetryConfig) MarshalJSON ¶
func (r RetryConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (RetryConfig) MarshalYAML ¶
func (r RetryConfig) MarshalYAML() (any, error)
MarshalYAML prints an empty object instead of nil.
type SFTPConfig ¶
type SFTPConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` Codec string `json:"codec" yaml:"codec"` Credentials sftpSetup.Credentials `json:"credentials" yaml:"credentials"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
SFTPConfig contains configuration fields for the SFTP output type.
func NewSFTPConfig ¶
func NewSFTPConfig() SFTPConfig
NewSFTPConfig creates a new Config with default values.
type STDOUTConfig ¶
type STDOUTConfig struct {
Codec string `json:"codec" yaml:"codec"`
}
STDOUTConfig contains configuration fields for the stdout based output type.
func NewSTDOUTConfig ¶
func NewSTDOUTConfig() STDOUTConfig
NewSTDOUTConfig creates a new STDOUTConfig with default values.
type SocketConfig ¶
type SocketConfig struct { Network string `json:"network" yaml:"network"` Address string `json:"address" yaml:"address"` Codec string `json:"codec" yaml:"codec"` }
SocketConfig contains configuration fields for the Socket output type.
func NewSocketConfig ¶
func NewSocketConfig() SocketConfig
NewSocketConfig creates a new SocketConfig with default values.
type Streamed ¶
type Streamed interface { // Consume starts the type receiving transactions from a Transactor. Consume(<-chan message.Transaction) error // Connected returns a boolean indicating whether this output is currently // connected to its target. Connected() bool // TriggerCloseNow triggers the shut down of this component but should not // block the calling goroutine. TriggerCloseNow() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(ctx context.Context) error }
Streamed is a common interface implemented by outputs and provides channel based streaming APIs.
func NewAsyncWriter ¶
func NewAsyncWriter(typeStr string, maxInflight int, w AsyncSink, mgr component.Observability) (Streamed, error)
NewAsyncWriter creates a Streamed implementation around an AsyncSink.
func OnlySinglePayloads ¶
OnlySinglePayloads expands message batches into individual payloads, respecting the max in flight of the wrapped output. This is a more efficient way of feeding messages into an output that handles its own batching mechanism internally, or does not support batching at all.
func WrapWithPipelines ¶
func WrapWithPipelines(out Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)
WrapWithPipelines wraps an output with a variadic number of pipelines.
type SubprocessConfig ¶
type SubprocessConfig struct { Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` Codec string `json:"codec" yaml:"codec"` }
SubprocessConfig contains configuration for the Subprocess input type.
func NewSubprocessConfig ¶
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig creates a new SubprocessConfig with default values.
type SwitchConfig ¶
type SwitchConfig struct { RetryUntilSuccess bool `json:"retry_until_success" yaml:"retry_until_success"` StrictMode bool `json:"strict_mode" yaml:"strict_mode"` Cases []SwitchConfigCase `json:"cases" yaml:"cases"` }
SwitchConfig contains configuration fields for the switchOutput output type.
func NewSwitchConfig ¶
func NewSwitchConfig() SwitchConfig
NewSwitchConfig creates a new SwitchConfig with default values.
type SwitchConfigCase ¶
type SwitchConfigCase struct { Check string `json:"check" yaml:"check"` Continue bool `json:"continue" yaml:"continue"` Output Config `json:"output" yaml:"output"` }
SwitchConfigCase contains configuration fields per output of a switch type.
func NewSwitchConfigCase ¶
func NewSwitchConfigCase() SwitchConfigCase
NewSwitchConfigCase creates a new switch output config with default values.
type Sync ¶
type Sync interface { // WriteTransaction attempts to write a transaction to an output. WriteTransaction(context.Context, message.Transaction) error // Connected returns a boolean indicating whether this output is currently // connected to its target. Connected() bool // TriggerStopConsuming instructs the output to start shutting down // resources once all pending messages are delivered and acknowledged. TriggerStopConsuming() // TriggerCloseNow triggers the shut down of this component but should not // block the calling goroutine. TriggerCloseNow() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(ctx context.Context) error }
Sync is a common interface implemented by outputs and provides synchronous based writing APIs.
type TryConfig ¶
type TryConfig []Config
TryConfig contains configuration fields for the Try output type.
func NewTryConfig ¶
func NewTryConfig() TryConfig
NewTryConfig creates a new BrokerConfig with default values.
type WithPipeline ¶
type WithPipeline struct {
// contains filtered or unexported fields
}
WithPipeline is a type that wraps both an output type and a pipeline type by routing the pipeline through the output, and implements the output.Type interface in order to act like an ordinary output.
func WrapWithPipeline ¶
func WrapWithPipeline(out Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)
WrapWithPipeline routes a processing pipeline directly into an output and returns a type that manages both and acts like an ordinary output.
func (*WithPipeline) Connected ¶
func (i *WithPipeline) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*WithPipeline) Consume ¶
func (i *WithPipeline) Consume(tsChan <-chan message.Transaction) error
Consume starts the type listening to a message channel from a producer.
func (*WithPipeline) TriggerCloseNow ¶
func (i *WithPipeline) TriggerCloseNow()
TriggerCloseNow triggers a closure of this object but does not block.
func (*WithPipeline) WaitForClose ¶
func (i *WithPipeline) WaitForClose(ctx context.Context) error
WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.
Source Files ¶
- async_writer.go
- batched_send.go
- config.go
- config_broker.go
- config_cache.go
- config_cassandra.go
- config_drop.go
- config_drop_on.go
- config_dynamic.go
- config_fallback.go
- config_mqtt.go
- config_nanomsg.go
- config_nsq.go
- config_retry.go
- config_sftp.go
- config_socket.go
- config_stdout.go
- config_subprocess.go
- config_switch.go
- docs.go
- interface.go
- not_batched.go
- wrap_with_pipeline.go