Documentation ¶
Overview ¶
Package output defines all sinks for sending Benthos messages to a variety of third party destinations. All output types must implement interface output.Type.
If the sink of an output provides a form of acknowledgement of message receipt then the output is responsible for propagating that acknowledgement back to the source of the data by sending it over the transaction response channel. Otherwise a standard acknowledgement is sent.
Index ¶
- Constants
- Variables
- func Descriptions() string
- func DocumentPlugin(typeString, description string, configSanitiser PluginConfigSanitiser)
- func PluginCount() int
- func PluginDescriptions() string
- func RegisterPlugin(typeString string, configConstructor PluginConfigConstructor, ...)
- func SanitiseConfig(conf Config) (interface{}, error)
- type AsyncSink
- type AsyncWriter
- type Batcher
- type BrokerConfig
- type CassandraConfig
- type Category
- type Config
- type DropOnConditions
- type DropOnConfig
- type DropOnError
- type DropOnErrorConfig
- type DynamicConfig
- type FileConfig
- type HTTPServer
- type HTTPServerConfig
- type Inproc
- type InprocConfig
- type LineWriter
- type PasswordAuthenticator
- type PluginConfigConstructor
- type PluginConfigSanitiser
- type PluginConstructor
- type RejectConfig
- type Resource
- type Retry
- type RetryConfig
- type SQLConfig
- type STDOUTConfig
- type Subprocess
- type SubprocessConfig
- type Switch
- type SwitchConfig
- type SwitchConfigCase
- type SwitchConfigOutput
- type TryConfig
- type Type
- func New(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ...) (Type, error)
- func NewAMQP(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAMQP09(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAMQP1(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSDynamoDB(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSKinesis(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSKinesisFirehose(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSS3(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSSNS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSSQS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAmazonS3(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAmazonSNS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAmazonSQS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAsyncWriter(typeStr string, maxInflight int, w AsyncSink, log log.Modular, ...) (Type, error)
- func NewAzureBlobStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAzureQueueStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAzureTableStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewBatcher(batcher *batch.Policy, child Type, log log.Modular, stats metrics.Type) Type
- func NewBroker(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ...) (Type, error)
- func NewCache(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDrop(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDropOnError(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDynamic(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDynamoDB(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewElasticsearch(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewFile(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewFiles(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewGCPPubSub(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewHDFS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewHTTPClient(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewHTTPServer(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewInproc(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewKafka(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewKinesis(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewKinesisFirehose(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewLineWriter(handle io.WriteCloser, closeOnExit bool, customDelimiter []byte, ...) (Type, error)
- func NewMQTT(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewNATS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewNATSStream(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewNSQ(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewNanomsg(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewRedisHash(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewRedisList(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewRedisPubSub(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewRedisStreams(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewResource(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewRetry(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSTDOUT(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSocket(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSwitch(conf Config, mgr types.Manager, logger log.Modular, stats metrics.Type) (Type, error)
- func NewTCP(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewTry(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ...) (Type, error)
- func NewUDP(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewWebsocket(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewWriter(typeStr string, w writer.Type, log log.Modular, stats metrics.Type) (Type, error)
- func WrapWithPipelines(out Type, pipeConstructors ...types.PipelineConstructorFunc) (Type, error)
- type TypeSpec
- type WithPipeline
- type Writer
Constants ¶
const ( TypeAMQP = "amqp" TypeAMQP09 = "amqp_0_9" TypeAMQP1 = "amqp_1" TypeAWSDynamoDB = "aws_dynamodb" TypeAWSKinesis = "aws_kinesis" TypeAWSKinesisFirehose = "aws_kinesis_firehose" TypeAWSS3 = "aws_s3" TypeAWSSNS = "aws_sns" TypeAWSSQS = "aws_sqs" TypeAzureBlobStorage = "azure_blob_storage" TypeAzureQueueStorage = "azure_queue_storage" TypeAzureTableStorage = "azure_table_storage" TypeBlobStorage = "blob_storage" TypeBroker = "broker" TypeCache = "cache" TypeCassandra = "cassandra" TypeDrop = "drop" TypeDropOn = "drop_on" TypeDropOnError = "drop_on_error" TypeDynamic = "dynamic" TypeDynamoDB = "dynamodb" TypeElasticsearch = "elasticsearch" TypeFile = "file" TypeFiles = "files" TypeGCPPubSub = "gcp_pubsub" TypeHDFS = "hdfs" TypeHTTPClient = "http_client" TypeHTTPServer = "http_server" TypeInproc = "inproc" TypeKafka = "kafka" TypeKinesis = "kinesis" TypeKinesisFirehose = "kinesis_firehose" TypeMQTT = "mqtt" TypeNanomsg = "nanomsg" TypeNATS = "nats" TypeNATSStream = "nats_stream" TypeNSQ = "nsq" TypeRedisHash = "redis_hash" TypeRedisList = "redis_list" TypeRedisPubSub = "redis_pubsub" TypeRedisStreams = "redis_streams" TypeReject = "reject" TypeResource = "resource" TypeRetry = "retry" TypeS3 = "s3" TypeSNS = "sns" TypeSQL = "sql" TypeSQS = "sqs" TypeSTDOUT = "stdout" TypeSubprocess = "subprocess" TypeSwitch = "switch" TypeSyncResponse = "sync_response" TypeTableStorage = "table_storage" TypeTCP = "tcp" TypeTry = "try" TypeUDP = "udp" TypeSocket = "socket" TypeWebsocket = "websocket" TypeZMQ4 = "zmq4" )
String constants representing each output type.
Variables ¶
var ( // ErrSwitchNoConditionMet is returned when a message does not match any // output conditions. ErrSwitchNoConditionMet = errors.New("no switch output conditions were met by message") // ErrSwitchNoCasesMatched is returned when a message does not match any // output cases. ErrSwitchNoCasesMatched = errors.New("no switch cases were matched by message") // ErrSwitchNoOutputs is returned when creating a Switch type with less than // 2 outputs. ErrSwitchNoOutputs = errors.New("attempting to create switch with fewer than 2 cases") )
var Constructors = map[string]TypeSpec{}
Constructors is a map of all output types with their specs.
var DocsAsync = `
This output benefits from sending multiple messages in flight in parallel for
improved performance. You can tune the max number of in flight messages with the
field ` + "`max_in_flight`" + `.`
DocsAsync returns a documentation paragraph regarding outputs that support asynchronous sends.
var DocsBatches = `` /* 205-byte string literal not displayed */
DocsBatches returns a documentation paragraph regarding outputs that support batching.
var ( // ErrBrokerNoOutputs is returned when creating a Broker type with zero // outputs. ErrBrokerNoOutputs = errors.New("attempting to create broker output type with no outputs") )
Functions ¶
func Descriptions ¶
func Descriptions() string
Descriptions returns a formatted string of collated descriptions of each type.
func DocumentPlugin ¶
func DocumentPlugin( typeString, description string, configSanitiser PluginConfigSanitiser, )
DocumentPlugin adds a description and an optional configuration sanitiser function to the definition of a registered plugin. This improves the documentation generated by PluginDescriptions.
func PluginCount ¶
func PluginCount() int
PluginCount returns the number of registered plugins. This does NOT count the standard set of components.
func PluginDescriptions ¶
func PluginDescriptions() string
PluginDescriptions generates and returns a markdown formatted document listing each registered plugin and an example configuration for it.
func RegisterPlugin ¶
func RegisterPlugin( typeString string, configConstructor PluginConfigConstructor, constructor PluginConstructor, )
RegisterPlugin registers a plugin by a unique name so that it can be constructed similar to regular outputs. If configuration is not needed for this plugin then configConstructor can be nil. A constructor for the plugin itself must be provided.
func SanitiseConfig ¶
SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.
Types ¶
type AsyncSink ¶ added in v3.8.0
type AsyncSink interface { // ConnectWithContext attempts to establish a connection to the sink, if // unsuccessful returns an error. If the attempt is successful (or not // necessary) returns nil. ConnectWithContext(ctx context.Context) error // WriteWithContext should block until either the message is sent (and // acknowledged) to a sink, or a transport specific error has occurred, or // the Type is closed. WriteWithContext(ctx context.Context, msg types.Message) error types.Closable }
AsyncSink is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.
type AsyncWriter ¶ added in v3.8.0
type AsyncWriter struct {
// contains filtered or unexported fields
}
AsyncWriter is an output type that writes messages to a writer.Type.
func (*AsyncWriter) CloseAsync ¶ added in v3.8.0
func (w *AsyncWriter) CloseAsync()
CloseAsync shuts down the File output and stops processing messages.
func (*AsyncWriter) Connected ¶ added in v3.8.0
func (w *AsyncWriter) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*AsyncWriter) Consume ¶ added in v3.8.0
func (w *AsyncWriter) Consume(ts <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*AsyncWriter) WaitForClose ¶ added in v3.8.0
func (w *AsyncWriter) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the File output has closed down.
type Batcher ¶ added in v3.4.0
type Batcher struct {
// contains filtered or unexported fields
}
Batcher wraps an output with a batching policy.
func (*Batcher) CloseAsync ¶ added in v3.4.0
func (m *Batcher) CloseAsync()
CloseAsync shuts down the Batcher and stops processing messages.
func (*Batcher) Connected ¶ added in v3.4.0
Connected returns a boolean indicating whether this output is currently connected to its target.
type BrokerConfig ¶
type BrokerConfig struct { Copies int `json:"copies" yaml:"copies"` Pattern string `json:"pattern" yaml:"pattern"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Outputs brokerOutputList `json:"outputs" yaml:"outputs"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
BrokerConfig contains configuration fields for the Broker output type.
func NewBrokerConfig ¶
func NewBrokerConfig() BrokerConfig
NewBrokerConfig creates a new BrokerConfig with default values.
type CassandraConfig ¶ added in v3.33.0
type CassandraConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` TLS btls.Config `json:"tls" yaml:"tls"` PasswordAuthenticator PasswordAuthenticator `json:"password_authenticator" yaml:"password_authenticator"` DisableInitialHostLookup bool `json:"disable_initial_host_lookup" yaml:"disable_initial_host_lookup"` Query string `json:"query" yaml:"query"` Args []string `json:"args" yaml:"args"` Consistency string `json:"consistency" yaml:"consistency"` retries.Config `json:",inline" yaml:",inline"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
CassandraConfig contains configuration fields for the Cassandra output type.
func NewCassandraConfig ¶ added in v3.33.0
func NewCassandraConfig() CassandraConfig
NewCassandraConfig creates a new CassandraConfig with default values.
type Category ¶ added in v3.27.0
type Category string
Category describes the general category of an output.
type Config ¶
type Config struct { Type string `json:"type" yaml:"type"` AMQP writer.AMQPConfig `json:"amqp" yaml:"amqp"` AMQP09 writer.AMQPConfig `json:"amqp_0_9" yaml:"amqp_0_9"` AMQP1 writer.AMQP1Config `json:"amqp_1" yaml:"amqp_1"` AWSDynamoDB writer.DynamoDBConfig `json:"aws_dynamodb" yaml:"aws_dynamodb"` AWSKinesis writer.KinesisConfig `json:"aws_kinesis" yaml:"aws_kinesis"` AWSKinesisFirehose writer.KinesisFirehoseConfig `json:"aws_kinesis_firehose" yaml:"aws_kinesis_firehose"` AWSS3 writer.AmazonS3Config `json:"aws_s3" yaml:"aws_s3"` AWSSNS writer.SNSConfig `json:"aws_sns" yaml:"aws_sns"` AWSSQS writer.AmazonSQSConfig `json:"aws_sqs" yaml:"aws_sqs"` AzureBlobStorage writer.AzureBlobStorageConfig `json:"azure_blob_storage" yaml:"azure_blob_storage"` AzureQueueStorage writer.AzureQueueStorageConfig `json:"azure_queue_storage" yaml:"azure_queue_storage"` AzureTableStorage writer.AzureTableStorageConfig `json:"azure_table_storage" yaml:"azure_table_storage"` BlobStorage writer.AzureBlobStorageConfig `json:"blob_storage" yaml:"blob_storage"` Broker BrokerConfig `json:"broker" yaml:"broker"` Cache writer.CacheConfig `json:"cache" yaml:"cache"` Cassandra CassandraConfig `json:"cassandra" yaml:"cassandra"` Drop writer.DropConfig `json:"drop" yaml:"drop"` DropOn DropOnConfig `json:"drop_on" yaml:"drop_on"` DropOnError DropOnErrorConfig `json:"drop_on_error" yaml:"drop_on_error"` Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"` DynamoDB writer.DynamoDBConfig `json:"dynamodb" yaml:"dynamodb"` Elasticsearch writer.ElasticsearchConfig `json:"elasticsearch" yaml:"elasticsearch"` File FileConfig `json:"file" yaml:"file"` Files writer.FilesConfig `json:"files" yaml:"files"` GCPPubSub writer.GCPPubSubConfig `json:"gcp_pubsub" yaml:"gcp_pubsub"` HDFS writer.HDFSConfig `json:"hdfs" yaml:"hdfs"` HTTPClient writer.HTTPClientConfig `json:"http_client" yaml:"http_client"` HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"` Inproc InprocConfig `json:"inproc" yaml:"inproc"` Kafka writer.KafkaConfig `json:"kafka" yaml:"kafka"` Kinesis writer.KinesisConfig `json:"kinesis" yaml:"kinesis"` KinesisFirehose writer.KinesisFirehoseConfig `json:"kinesis_firehose" yaml:"kinesis_firehose"` MQTT writer.MQTTConfig `json:"mqtt" yaml:"mqtt"` Nanomsg writer.NanomsgConfig `json:"nanomsg" yaml:"nanomsg"` NATS writer.NATSConfig `json:"nats" yaml:"nats"` NATSStream writer.NATSStreamConfig `json:"nats_stream" yaml:"nats_stream"` NSQ writer.NSQConfig `json:"nsq" yaml:"nsq"` Plugin interface{} `json:"plugin,omitempty" yaml:"plugin,omitempty"` RedisHash writer.RedisHashConfig `json:"redis_hash" yaml:"redis_hash"` RedisList writer.RedisListConfig `json:"redis_list" yaml:"redis_list"` RedisPubSub writer.RedisPubSubConfig `json:"redis_pubsub" yaml:"redis_pubsub"` RedisStreams writer.RedisStreamsConfig `json:"redis_streams" yaml:"redis_streams"` Reject RejectConfig `json:"reject" yaml:"reject"` Resource string `json:"resource" yaml:"resource"` Retry RetryConfig `json:"retry" yaml:"retry"` S3 writer.AmazonS3Config `json:"s3" yaml:"s3"` SNS writer.SNSConfig `json:"sns" yaml:"sns"` SQL SQLConfig `json:"sql" yaml:"sql"` SQS writer.AmazonSQSConfig `json:"sqs" yaml:"sqs"` STDOUT STDOUTConfig `json:"stdout" yaml:"stdout"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Switch SwitchConfig `json:"switch" yaml:"switch"` SyncResponse struct{} `json:"sync_response" yaml:"sync_response"` TableStorage writer.AzureTableStorageConfig `json:"table_storage" yaml:"table_storage"` TCP writer.TCPConfig `json:"tcp" yaml:"tcp"` Try TryConfig `json:"try" yaml:"try"` UDP writer.UDPConfig `json:"udp" yaml:"udp"` Socket writer.SocketConfig `json:"socket" yaml:"socket"` Websocket writer.WebsocketConfig `json:"websocket" yaml:"websocket"` ZMQ4 *writer.ZMQ4Config `json:"zmq4,omitempty" yaml:"zmq4,omitempty"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is the all encompassing configuration struct for all output types.
func NewConfig ¶
func NewConfig() Config
NewConfig returns a configuration struct fully populated with default values.
func (Config) Sanitised ¶ added in v3.24.0
Sanitised returns a sanitised version of the config, meaning sections that aren't relevant to behaviour are removed. Also optionally removes deprecated fields.
func (*Config) UnmarshalYAML ¶
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type DropOnConditions ¶ added in v3.37.0
type DropOnConditions struct { Error bool `json:"error" yaml:"error"` BackPressure string `json:"back_pressure" yaml:"back_pressure"` }
DropOnConditions is a config struct representing the different circumstances under which messages should be dropped.
type DropOnConfig ¶ added in v3.37.0
type DropOnConfig struct { DropOnConditions `json:",inline" yaml:",inline"` Output *Config `json:"output" yaml:"output"` }
DropOnConfig contains configuration values for the DropOn output type.
func NewDropOnConfig ¶ added in v3.37.0
func NewDropOnConfig() DropOnConfig
NewDropOnConfig creates a new DropOnConfig with default values.
func (DropOnConfig) MarshalJSON ¶ added in v3.37.0
func (d DropOnConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (DropOnConfig) MarshalYAML ¶ added in v3.37.0
func (d DropOnConfig) MarshalYAML() (interface{}, error)
MarshalYAML prints an empty object instead of nil.
type DropOnError ¶
type DropOnError struct {
// contains filtered or unexported fields
}
DropOnError is an output type that continuously writes a message to a child output until the send is successful.
func (*DropOnError) CloseAsync ¶
func (d *DropOnError) CloseAsync()
CloseAsync shuts down the DropOnError input and stops processing requests.
func (*DropOnError) Connected ¶
func (d *DropOnError) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*DropOnError) Consume ¶
func (d *DropOnError) Consume(ts <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*DropOnError) WaitForClose ¶
func (d *DropOnError) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the DropOnError input has closed down.
type DropOnErrorConfig ¶
type DropOnErrorConfig struct {
*Config `yaml:",inline" json:",inline"`
}
DropOnErrorConfig contains configuration values for the DropOnError output type.
func NewDropOnErrorConfig ¶
func NewDropOnErrorConfig() DropOnErrorConfig
NewDropOnErrorConfig creates a new DropOnErrorConfig with default values.
func (DropOnErrorConfig) MarshalJSON ¶
func (d DropOnErrorConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (DropOnErrorConfig) MarshalYAML ¶
func (d DropOnErrorConfig) MarshalYAML() (interface{}, error)
MarshalYAML prints an empty object instead of nil.
func (*DropOnErrorConfig) UnmarshalJSON ¶
func (d *DropOnErrorConfig) UnmarshalJSON(bytes []byte) error
UnmarshalJSON ensures that when parsing child config it is initialised.
func (*DropOnErrorConfig) UnmarshalYAML ¶
func (d *DropOnErrorConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML ensures that when parsing child config it is initialised.
type DynamicConfig ¶
type DynamicConfig struct { Outputs map[string]Config `json:"outputs" yaml:"outputs"` Prefix string `json:"prefix" yaml:"prefix"` Timeout string `json:"timeout" yaml:"timeout"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
DynamicConfig contains configuration fields for the Dynamic output type.
func NewDynamicConfig ¶
func NewDynamicConfig() DynamicConfig
NewDynamicConfig creates a new DynamicConfig with default values.
type FileConfig ¶
type FileConfig struct { Path string `json:"path" yaml:"path"` Codec string `json:"codec" yaml:"codec"` Delim string `json:"delimiter" yaml:"delimiter"` }
FileConfig contains configuration fields for the file based output type.
func NewFileConfig ¶
func NewFileConfig() FileConfig
NewFileConfig creates a new FileConfig with default values.
type HTTPServer ¶
type HTTPServer struct {
// contains filtered or unexported fields
}
HTTPServer is an output type that serves HTTPServer GET requests.
func (*HTTPServer) CloseAsync ¶
func (h *HTTPServer) CloseAsync()
CloseAsync shuts down the HTTPServer output and stops processing requests.
func (*HTTPServer) Connected ¶
func (h *HTTPServer) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*HTTPServer) Consume ¶
func (h *HTTPServer) Consume(ts <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*HTTPServer) WaitForClose ¶
func (h *HTTPServer) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the HTTPServer output has closed down.
type HTTPServerConfig ¶
type HTTPServerConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` StreamPath string `json:"stream_path" yaml:"stream_path"` WSPath string `json:"ws_path" yaml:"ws_path"` AllowedVerbs []string `json:"allowed_verbs" yaml:"allowed_verbs"` Timeout string `json:"timeout" yaml:"timeout"` CertFile string `json:"cert_file" yaml:"cert_file"` KeyFile string `json:"key_file" yaml:"key_file"` }
HTTPServerConfig contains configuration fields for the HTTPServer output type.
func NewHTTPServerConfig ¶
func NewHTTPServerConfig() HTTPServerConfig
NewHTTPServerConfig creates a new HTTPServerConfig with default values.
type Inproc ¶
type Inproc struct {
// contains filtered or unexported fields
}
Inproc is an output type that serves Inproc messages.
func (*Inproc) CloseAsync ¶
func (i *Inproc) CloseAsync()
CloseAsync shuts down the Inproc output and stops processing messages.
func (*Inproc) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
type InprocConfig ¶
type InprocConfig string
InprocConfig contains configuration fields for the Inproc output type.
func NewInprocConfig ¶
func NewInprocConfig() InprocConfig
NewInprocConfig creates a new InprocConfig with default values.
type LineWriter ¶
type LineWriter struct {
// contains filtered or unexported fields
}
LineWriter is an output type that writes messages to an io.WriterCloser type as lines.
func (*LineWriter) CloseAsync ¶
func (w *LineWriter) CloseAsync()
CloseAsync shuts down the File output and stops processing messages.
func (*LineWriter) Connected ¶
func (w *LineWriter) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*LineWriter) Consume ¶
func (w *LineWriter) Consume(ts <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*LineWriter) WaitForClose ¶
func (w *LineWriter) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the File output has closed down.
type PasswordAuthenticator ¶ added in v3.33.0
type PasswordAuthenticator struct { Enabled bool `json:"enabled" yaml:"enabled"` Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` }
PasswordAuthenticator contains the fields that will be used to authenticate with the Cassandra cluster.
type PluginConfigConstructor ¶
type PluginConfigConstructor func() interface{}
PluginConfigConstructor is a func that returns a pointer to a new and fully populated configuration struct for a plugin type.
type PluginConfigSanitiser ¶
type PluginConfigSanitiser func(conf interface{}) interface{}
PluginConfigSanitiser is a function that takes a configuration object for a plugin and returns a sanitised (minimal) version of it for printing in examples and plugin documentation.
This function is useful for when a plugins configuration struct is very large and complex, but can sometimes be expressed in a more concise way without losing the original intent.
type PluginConstructor ¶
type PluginConstructor func( config interface{}, manager types.Manager, logger log.Modular, metrics metrics.Type, ) (types.Output, error)
PluginConstructor is a func that constructs a Benthos output plugin. These are plugins that are specific to certain use cases, experimental, private or otherwise unfit for widespread general use. Any number of plugins can be specified when using Benthos as a framework.
The configuration object will be the result of the PluginConfigConstructor after overlaying the user configuration.
type RejectConfig ¶ added in v3.36.0
type RejectConfig string
RejectConfig contains configuration fields for the file based output type.
func NewRejectConfig ¶ added in v3.36.0
func NewRejectConfig() RejectConfig
NewRejectConfig creates a new RejectConfig with default values.
type Resource ¶ added in v3.12.0
type Resource struct {
// contains filtered or unexported fields
}
Resource is a processor that returns the result of a output resource.
func (*Resource) CloseAsync ¶ added in v3.12.0
func (r *Resource) CloseAsync()
CloseAsync shuts down the output and stops processing requests.
func (*Resource) Connected ¶ added in v3.12.0
Connected returns a boolean indicating whether this output is currently connected to its target.
type Retry ¶
type Retry struct {
// contains filtered or unexported fields
}
Retry is an output type that continuously writes a message to a child output until the send is successful.
func (*Retry) CloseAsync ¶
func (r *Retry) CloseAsync()
CloseAsync shuts down the Retry input and stops processing requests.
func (*Retry) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
type RetryConfig ¶
type RetryConfig struct { Output *Config `json:"output" yaml:"output"` retries.Config `json:",inline" yaml:",inline"` }
RetryConfig contains configuration values for the Retry output type.
func NewRetryConfig ¶
func NewRetryConfig() RetryConfig
NewRetryConfig creates a new RetryConfig with default values.
func (RetryConfig) MarshalJSON ¶
func (r RetryConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (RetryConfig) MarshalYAML ¶
func (r RetryConfig) MarshalYAML() (interface{}, error)
MarshalYAML prints an empty object instead of nil.
type SQLConfig ¶ added in v3.33.0
type SQLConfig struct { Driver string `json:"driver" yaml:"driver"` DataSourceName string `json:"data_source_name" yaml:"data_source_name"` Query string `json:"query" yaml:"query"` Args []string `json:"args" yaml:"args"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
SQLConfig contains configuration fields for the SQL processor.
func NewSQLConfig ¶ added in v3.33.0
func NewSQLConfig() SQLConfig
NewSQLConfig returns a SQLConfig with default values.
type STDOUTConfig ¶
type STDOUTConfig struct {
Delim string `json:"delimiter" yaml:"delimiter"`
}
STDOUTConfig contains configuration fields for the stdout based output type.
func NewSTDOUTConfig ¶
func NewSTDOUTConfig() STDOUTConfig
NewSTDOUTConfig creates a new STDOUTConfig with default values.
type Subprocess ¶ added in v3.31.0
type Subprocess struct {
// contains filtered or unexported fields
}
Subprocess executes a bloblang mapping with an empty context each time this input is read from. An interval period must be specified that determines how often a message is generated.
func (*Subprocess) CloseAsync ¶ added in v3.31.0
func (s *Subprocess) CloseAsync()
CloseAsync shuts down the bloblang reader.
func (*Subprocess) ConnectWithContext ¶ added in v3.31.0
func (s *Subprocess) ConnectWithContext(ctx context.Context) error
ConnectWithContext establishes a Subprocess reader.
func (*Subprocess) WaitForClose ¶ added in v3.31.0
func (s *Subprocess) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the bloblang input has closed down.
func (*Subprocess) WriteWithContext ¶ added in v3.31.0
WriteWithContext attempts to write message contents to a directory as files.
type SubprocessConfig ¶ added in v3.31.0
type SubprocessConfig struct { Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` Codec string `json:"codec" yaml:"codec"` }
SubprocessConfig contains configuration for the Subprocess input type.
func NewSubprocessConfig ¶ added in v3.31.0
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig creates a new SubprocessConfig with default values.
type Switch ¶
type Switch struct {
// contains filtered or unexported fields
}
Switch is a broker that implements types.Consumer and broadcasts each message out to an array of outputs.
func (*Switch) CloseAsync ¶
func (o *Switch) CloseAsync()
CloseAsync shuts down the Switch broker and stops processing requests.
func (*Switch) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
type SwitchConfig ¶
type SwitchConfig struct { RetryUntilSuccess bool `json:"retry_until_success" yaml:"retry_until_success"` StrictMode bool `json:"strict_mode" yaml:"strict_mode"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Cases []SwitchConfigCase `json:"cases" yaml:"cases"` Outputs []SwitchConfigOutput `json:"outputs" yaml:"outputs"` }
SwitchConfig contains configuration fields for the Switch output type.
func NewSwitchConfig ¶
func NewSwitchConfig() SwitchConfig
NewSwitchConfig creates a new SwitchConfig with default values.
type SwitchConfigCase ¶ added in v3.28.0
type SwitchConfigCase struct { Check string `json:"check" yaml:"check"` Continue bool `json:"continue" yaml:"continue"` Output Config `json:"output" yaml:"output"` }
SwitchConfigCase contains configuration fields per output of a switch type.
func NewSwitchConfigCase ¶ added in v3.28.0
func NewSwitchConfigCase() SwitchConfigCase
NewSwitchConfigCase creates a new switch output config with default values.
type SwitchConfigOutput ¶
type SwitchConfigOutput struct { Condition condition.Config `json:"condition" yaml:"condition"` Fallthrough bool `json:"fallthrough" yaml:"fallthrough"` Output Config `json:"output" yaml:"output"` }
SwitchConfigOutput contains configuration fields per output of a switch type.
func NewSwitchConfigOutput ¶
func NewSwitchConfigOutput() SwitchConfigOutput
NewSwitchConfigOutput creates a new switch output config with default values.
func (*SwitchConfigOutput) UnmarshalJSON ¶
func (s *SwitchConfigOutput) UnmarshalJSON(bytes []byte) error
UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.
func (*SwitchConfigOutput) UnmarshalYAML ¶
func (s *SwitchConfigOutput) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type TryConfig ¶ added in v3.7.0
type TryConfig brokerOutputList
TryConfig contains configuration fields for the Try output type.
func NewTryConfig ¶ added in v3.7.0
func NewTryConfig() TryConfig
NewTryConfig creates a new BrokerConfig with default values.
type Type ¶
type Type interface { types.Closable types.Consumer // Connected returns a boolean indicating whether this output is currently // connected to its target. Connected() bool }
Type is the standard interface of an output type.
func New ¶
func New( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, pipelines ...types.PipelineConstructorFunc, ) (Type, error)
New creates an output type based on an output configuration.
func NewAWSDynamoDB ¶ added in v3.36.0
func NewAWSDynamoDB(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAWSDynamoDB creates a new DynamoDB output type.
func NewAWSKinesis ¶ added in v3.36.0
func NewAWSKinesis(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAWSKinesis creates a new Kinesis output type.
func NewAWSKinesisFirehose ¶ added in v3.36.0
func NewAWSKinesisFirehose(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAWSKinesisFirehose creates a new KinesisFirehose output type.
func NewAmazonS3 ¶
NewAmazonS3 creates a new AmazonS3 output type.
func NewAmazonSNS ¶
func NewAmazonSNS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAmazonSNS creates a new AmazonSNS output type.
func NewAmazonSQS ¶
func NewAmazonSQS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAmazonSQS creates a new AmazonSQS output type.
func NewAsyncWriter ¶ added in v3.8.0
func NewAsyncWriter( typeStr string, maxInflight int, w AsyncSink, log log.Modular, stats metrics.Type, ) (Type, error)
NewAsyncWriter creates a new AsyncWriter output type.
func NewAzureBlobStorage ¶ added in v3.21.0
func NewAzureBlobStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAzureBlobStorage creates a new AzureBlobStorage output type.
func NewAzureQueueStorage ¶ added in v3.36.0
func NewAzureQueueStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAzureQueueStorage creates a new AzureQueueStorage output type.
func NewAzureTableStorage ¶ added in v3.23.0
func NewAzureTableStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAzureTableStorage creates a new NewAzureTableStorage output type.
func NewBatcher ¶ added in v3.4.0
NewBatcher creates a new Producer/Consumer around a buffer.
func NewBroker ¶
func NewBroker( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, pipelines ...types.PipelineConstructorFunc, ) (Type, error)
NewBroker creates a new Broker output type. Messages will be sent out to the list of outputs according to the chosen broker pattern.
func NewDropOnError ¶
func NewDropOnError( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewDropOnError creates a new DropOnError input type.
func NewDynamic ¶
func NewDynamic( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewDynamic creates a new Dynamic output type.
func NewDynamoDB ¶
NewDynamoDB creates a new DynamoDB output type.
func NewElasticsearch ¶
func NewElasticsearch(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewElasticsearch creates a new Elasticsearch output type.
func NewGCPPubSub ¶
func NewGCPPubSub(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewGCPPubSub creates a new GCPPubSub output type.
func NewHTTPClient ¶
func NewHTTPClient(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewHTTPClient creates a new HTTPClient output type.
func NewHTTPServer ¶
func NewHTTPServer(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewHTTPServer creates a new HTTPServer output type.
func NewKinesis ¶
NewKinesis creates a new Kinesis output type.
func NewKinesisFirehose ¶ added in v3.1.0
func NewKinesisFirehose(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewKinesisFirehose creates a new KinesisFirehose output type.
func NewLineWriter ¶
func NewLineWriter( handle io.WriteCloser, closeOnExit bool, customDelimiter []byte, typeStr string, log log.Modular, stats metrics.Type, ) (Type, error)
NewLineWriter creates a new LineWriter output type.
func NewNATSStream ¶
func NewNATSStream(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewNATSStream creates a new NATSStream output type.
func NewNanomsg ¶
NewNanomsg creates a new Nanomsg output type.
func NewRedisHash ¶
func NewRedisHash(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewRedisHash creates a new RedisHash output type.
func NewRedisList ¶
func NewRedisList(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewRedisList creates a new RedisList output type.
func NewRedisPubSub ¶
func NewRedisPubSub(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewRedisPubSub creates a new RedisPubSub output type.
func NewRedisStreams ¶
func NewRedisStreams(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewRedisStreams creates a new RedisStreams output type.
func NewResource ¶ added in v3.12.0
func NewResource( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewResource returns a resource output.
func NewSwitch ¶
func NewSwitch( conf Config, mgr types.Manager, logger log.Modular, stats metrics.Type, ) (Type, error)
NewSwitch creates a new Switch type by providing outputs. Messages will be sent to a subset of outputs according to condition and fallthrough settings.
func NewTry ¶ added in v3.7.0
func NewTry( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, pipelines ...types.PipelineConstructorFunc, ) (Type, error)
NewTry creates a new try broker output type.
func NewWebsocket ¶
func NewWebsocket(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewWebsocket creates a new Websocket output type.
func WrapWithPipelines ¶
func WrapWithPipelines(out Type, pipeConstructors ...types.PipelineConstructorFunc) (Type, error)
WrapWithPipelines wraps an output with a variadic number of pipelines.
type TypeSpec ¶
type TypeSpec struct { // Async indicates whether this output benefits from sending multiple // messages asynchronously over the protocol. Async bool // Batches indicates whether this output benefits from batching of messages. Batches bool Status docs.Status Summary string Description string Categories []Category Footnotes string FieldSpecs docs.FieldSpecs Examples []docs.AnnotatedExample Version string // contains filtered or unexported fields }
TypeSpec is a constructor and a usage description for each output type.
type WithPipeline ¶
type WithPipeline struct {
// contains filtered or unexported fields
}
WithPipeline is a type that wraps both an output type and a pipeline type by routing the pipeline through the output, and implements the output.Type interface in order to act like an ordinary output.
func WrapWithPipeline ¶
func WrapWithPipeline(i *int, out Type, pipeConstructor types.PipelineConstructorFunc) (*WithPipeline, error)
WrapWithPipeline routes a processing pipeline directly into an output and returns a type that manages both and acts like an ordinary output.
func (*WithPipeline) CloseAsync ¶
func (i *WithPipeline) CloseAsync()
CloseAsync triggers a closure of this object but does not block.
func (*WithPipeline) Connected ¶
func (i *WithPipeline) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*WithPipeline) Consume ¶
func (i *WithPipeline) Consume(tsChan <-chan types.Transaction) error
Consume starts the type listening to a message channel from a producer.
func (*WithPipeline) WaitForClose ¶
func (i *WithPipeline) WaitForClose(timeout time.Duration) error
WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer is an output type that writes messages to a writer.Type.
func (*Writer) CloseAsync ¶
func (w *Writer) CloseAsync()
CloseAsync shuts down the File output and stops processing messages.
func (*Writer) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
Source Files ¶
- amqp.go
- amqp_0_9.go
- amqp_1.go
- async_writer.go
- aws_dynamodb.go
- aws_kinesis.go
- aws_kinesis_firehose.go
- aws_s3.go
- aws_sns.go
- aws_sqs.go
- azure_blob_storage.go
- azure_queue_storage.go
- azure_table_storage.go
- batcher.go
- broker.go
- broker_out_common.go
- cache.go
- cassandra.go
- constructor.go
- docs.go
- drop.go
- drop_on.go
- drop_on_error.go
- dynamic.go
- elasticsearch.go
- file.go
- files.go
- gcp_pubsub.go
- hdfs.go
- http_client.go
- http_server.go
- inproc.go
- interface.go
- kafka.go
- line_writer.go
- mqtt.go
- nanomsg.go
- nats.go
- nats_stream.go
- nsq.go
- package.go
- plugin.go
- redis_hash.go
- redis_list.go
- redis_pubsub.go
- redis_streams.go
- reject.go
- resource.go
- retry.go
- socket.go
- sql.go
- sql_extra.go
- stdout.go
- subprocess.go
- switch.go
- switch_deprecated.go
- sync_response.go
- tcp.go
- try.go
- udp.go
- websocket.go
- wrap_with_pipeline.go
- writer.go