Documentation ¶
Overview ¶
Package output defines all sinks for sending Benthos messages to a variety of third party destinations. All output types must implement interface output.Type.
If the sink of an output provides a form of acknowledgement of message receipt then the output is responsible for propagating that acknowledgement back to the source of the data by sending it over the transaction response channel. Otherwise a standard acknowledgement is sent.
Index ¶
- Constants
- Variables
- func AppendProcessorsFromConfig(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ...) []types.PipelineConstructorFunc
- func Descriptions() string
- func DocumentPlugin(typeString, description string, configSanitiser PluginConfigSanitiser)
- func PluginCount() int
- func PluginDescriptions() string
- func RegisterPlugin(typeString string, configConstructor PluginConfigConstructor, ...)
- func SanitiseConfig(conf Config) (interface{}, error)
- func WalkConstructors(fn func(ConstructorFunc, docs.ComponentSpec))
- type AsyncSink
- type AsyncWriter
- func (w *AsyncWriter) CloseAsync()
- func (w *AsyncWriter) Connected() bool
- func (w *AsyncWriter) Consume(ts <-chan types.Transaction) error
- func (w *AsyncWriter) MaxInFlight() (int, bool)
- func (w *AsyncWriter) SetInjectTracingMap(mapping string) error
- func (w *AsyncWriter) SetNoCancel()
- func (w *AsyncWriter) WaitForClose(timeout time.Duration) error
- type Batcher
- type BrokerConfig
- type CassandraConfig
- type Category
- type Config
- type ConstructorFunc
- type DropOnConditions
- type DropOnConfig
- type DropOnError
- type DropOnErrorConfig
- type DynamicConfig
- type FileConfig
- type GCPCloudStorageConfig
- type HTTPServer
- type HTTPServerConfig
- type Inproc
- type InprocConfig
- type LineWriter
- type MongoDBConfig
- type NATSJetStreamConfig
- type PasswordAuthenticator
- type PluginConfigConstructor
- type PluginConfigSanitiser
- type PluginConstructor
- type PulsarConfig
- type RejectConfig
- type Resource
- type Retry
- type RetryConfig
- type SFTPConfig
- type SQLConfig
- type STDOUTConfig
- type Subprocess
- type SubprocessConfig
- type Switch
- type SwitchConfig
- type SwitchConfigCase
- type SwitchConfigOutput
- type TryConfig
- type Type
- func New(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ...) (Type, error)
- func NewAMQP(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAMQP09(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAMQP1(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSDynamoDB(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSKinesis(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSKinesisFirehose(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSS3(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSSNS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAWSSQS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAmazonS3(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAmazonSNS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAmazonSQS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAsyncWriter(typeStr string, maxInflight int, w AsyncSink, log log.Modular, ...) (Type, error)
- func NewAzureBlobStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAzureQueueStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewAzureTableStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewBatcher(batcher *batch.Policy, child Type, log log.Modular, stats metrics.Type) Type
- func NewBatcherFromConfig(conf batch.PolicyConfig, child Type, mgr types.Manager, log log.Modular, ...) (Type, error)
- func NewBroker(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ...) (Type, error)
- func NewCache(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDrop(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDropOnError(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDynamic(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewDynamoDB(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewElasticsearch(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewFile(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewFiles(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewGCPPubSub(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewHDFS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewHTTPClient(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewHTTPServer(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewInproc(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewKafka(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewKinesis(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewKinesisFirehose(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewLineWriter(handle io.WriteCloser, closeOnExit bool, customDelimiter []byte, ...) (Type, error)
- func NewMQTT(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewNATS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewNATSStream(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewNSQ(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewNanomsg(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewRedisHash(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewRedisList(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewRedisPubSub(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewRedisStreams(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewResource(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewRetry(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSTDOUT(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSocket(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewSwitch(conf Config, mgr types.Manager, logger log.Modular, stats metrics.Type) (Type, error)
- func NewTCP(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewTry(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ...) (Type, error)
- func NewUDP(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewWebsocket(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
- func NewWriter(typeStr string, w writer.Type, log log.Modular, stats metrics.Type) (Type, error)
- func OnlySinglePayloads(out Type) Type
- func WrapWithPipelines(out Type, pipeConstructors ...types.PipelineConstructorFunc) (Type, error)
- type TypeSpec
- type WithPipeline
- type Writer
Constants ¶
const ( TypeAMQP = "amqp" TypeAMQP09 = "amqp_0_9" TypeAMQP1 = "amqp_1" TypeAWSDynamoDB = "aws_dynamodb" TypeAWSKinesis = "aws_kinesis" TypeAWSKinesisFirehose = "aws_kinesis_firehose" TypeAWSS3 = "aws_s3" TypeAWSSNS = "aws_sns" TypeAWSSQS = "aws_sqs" TypeAzureBlobStorage = "azure_blob_storage" TypeAzureQueueStorage = "azure_queue_storage" TypeAzureTableStorage = "azure_table_storage" TypeBlobStorage = "blob_storage" TypeBroker = "broker" TypeCache = "cache" TypeCassandra = "cassandra" TypeDrop = "drop" TypeDropOn = "drop_on" TypeDropOnError = "drop_on_error" TypeDynamic = "dynamic" TypeDynamoDB = "dynamodb" TypeElasticsearch = "elasticsearch" TypeFallback = "fallback" TypeFile = "file" TypeFiles = "files" TypeGCPCloudStorage = "gcp_cloud_storage" TypeGCPPubSub = "gcp_pubsub" TypeHDFS = "hdfs" TypeHTTPClient = "http_client" TypeHTTPServer = "http_server" TypeInproc = "inproc" TypeKafka = "kafka" TypeKinesis = "kinesis" TypeKinesisFirehose = "kinesis_firehose" TypeMongoDB = "mongodb" TypeMQTT = "mqtt" TypeNanomsg = "nanomsg" TypeNATS = "nats" TypeNATSJetStream = "nats_jetstream" TypeNATSStream = "nats_stream" TypeNSQ = "nsq" TypePulsar = "pulsar" TypeRedisHash = "redis_hash" TypeRedisList = "redis_list" TypeRedisPubSub = "redis_pubsub" TypeRedisStreams = "redis_streams" TypeReject = "reject" TypeResource = "resource" TypeRetry = "retry" TypeS3 = "s3" TypeSFTP = "sftp" TypeSNS = "sns" TypeSQL = "sql" TypeSQS = "sqs" TypeSTDOUT = "stdout" TypeSubprocess = "subprocess" TypeSwitch = "switch" TypeSyncResponse = "sync_response" TypeTableStorage = "table_storage" TypeTCP = "tcp" TypeTry = "try" TypeUDP = "udp" TypeSocket = "socket" TypeWebsocket = "websocket" TypeZMQ4 = "zmq4" )
String constants representing each output type. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
const ( // GCPCloudStorageErrorIfExistsCollisionMode - error-if-exists GCPCloudStorageErrorIfExistsCollisionMode = "error-if-exists" // GCPCloudStorageAppendCollisionMode - append GCPCloudStorageAppendCollisionMode = "append" // GCPCloudStorageIgnoreCollisionMode - ignore GCPCloudStorageIgnoreCollisionMode = "ignore" // GCPCloudStorageOverwriteCollisionMode - overwrite GCPCloudStorageOverwriteCollisionMode = "overwrite" )
Variables ¶
var ( // ErrSwitchNoConditionMet is returned when a message does not match any // output conditions. ErrSwitchNoConditionMet = errors.New("no switch output conditions were met by message") // ErrSwitchNoCasesMatched is returned when a message does not match any // output cases. ErrSwitchNoCasesMatched = errors.New("no switch cases were matched by message") // ErrSwitchNoOutputs is returned when creating a Switch type with less than // 2 outputs. ErrSwitchNoOutputs = errors.New("attempting to create switch with fewer than 2 cases") )
var Constructors = map[string]TypeSpec{}
Constructors is a map of all output types with their specs.
var DocsAsync = `
This output benefits from sending multiple messages in flight in parallel for
improved performance. You can tune the max number of in flight messages with the
field ` + "`max_in_flight`" + `.`
DocsAsync returns a documentation paragraph regarding outputs that support asynchronous sends.
var DocsBatches = `` /* 205-byte string literal not displayed */
DocsBatches returns a documentation paragraph regarding outputs that support batching.
var ( // ErrBrokerNoOutputs is returned when creating a Broker type with zero // outputs. ErrBrokerNoOutputs = errors.New("attempting to create broker output type with no outputs") )
Functions ¶
func AppendProcessorsFromConfig ¶
func AppendProcessorsFromConfig( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, pipelines ...types.PipelineConstructorFunc, ) []types.PipelineConstructorFunc
AppendProcessorsFromConfig takes a variant arg of pipeline constructor functions and returns a new slice of them where the processors of the provided output configuration will also be initialized.
func Descriptions ¶
func Descriptions() string
Descriptions returns a formatted string of collated descriptions of each type.
func DocumentPlugin ¶
func DocumentPlugin( typeString, description string, configSanitiser PluginConfigSanitiser, )
DocumentPlugin adds a description and an optional configuration sanitiser function to the definition of a registered plugin. This improves the documentation generated by PluginDescriptions.
func PluginCount ¶
func PluginCount() int
PluginCount returns the number of registered plugins. This does NOT count the standard set of components.
func PluginDescriptions ¶
func PluginDescriptions() string
PluginDescriptions generates and returns a markdown formatted document listing each registered plugin and an example configuration for it.
func RegisterPlugin ¶
func RegisterPlugin( typeString string, configConstructor PluginConfigConstructor, constructor PluginConstructor, )
RegisterPlugin registers a plugin by a unique name so that it can be constructed similar to regular outputs. If configuration is not needed for this plugin then configConstructor can be nil. A constructor for the plugin itself must be provided.
func SanitiseConfig ¶
SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.
func WalkConstructors ¶
func WalkConstructors(fn func(ConstructorFunc, docs.ComponentSpec))
WalkConstructors iterates each component constructor.
Types ¶
type AsyncSink ¶
type AsyncSink interface { // ConnectWithContext attempts to establish a connection to the sink, if // unsuccessful returns an error. If the attempt is successful (or not // necessary) returns nil. ConnectWithContext(ctx context.Context) error // WriteWithContext should block until either the message is sent (and // acknowledged) to a sink, or a transport specific error has occurred, or // the Type is closed. WriteWithContext(ctx context.Context, msg types.Message) error types.Closable }
AsyncSink is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.
type AsyncWriter ¶
type AsyncWriter struct {
// contains filtered or unexported fields
}
AsyncWriter is an output type that writes messages to a writer.Type.
func (*AsyncWriter) CloseAsync ¶
func (w *AsyncWriter) CloseAsync()
CloseAsync shuts down the File output and stops processing messages.
func (*AsyncWriter) Connected ¶
func (w *AsyncWriter) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*AsyncWriter) Consume ¶
func (w *AsyncWriter) Consume(ts <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*AsyncWriter) MaxInFlight ¶
func (w *AsyncWriter) MaxInFlight() (int, bool)
MaxInFlight returns the maximum number of in flight messages permitted by the output. This value can be used to determine a sensible value for parent outputs, but should not be relied upon as part of dispatcher logic.
func (*AsyncWriter) SetInjectTracingMap ¶
func (w *AsyncWriter) SetInjectTracingMap(mapping string) error
SetInjectTracingMap sets a mapping to be used for injecting tracing events into messages.
func (*AsyncWriter) SetNoCancel ¶
func (w *AsyncWriter) SetNoCancel()
SetNoCancel configures the async writer so that write calls do not use a context that gets cancelled on shutdown. This is much more efficient as it reduces allocations, goroutines and defers for each write call, but also means the write can block graceful termination. Therefore this setting should be reserved for outputs that are exceptionally fast.
func (*AsyncWriter) WaitForClose ¶
func (w *AsyncWriter) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the File output has closed down.
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
Batcher wraps an output with a batching policy.
func (*Batcher) CloseAsync ¶
func (m *Batcher) CloseAsync()
CloseAsync shuts down the Batcher and stops processing messages.
func (*Batcher) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*Batcher) Consume ¶
func (m *Batcher) Consume(msgs <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*Batcher) MaxInFlight ¶
MaxInFlight returns the maximum number of in flight messages permitted by the output. This value can be used to determine a sensible value for parent outputs, but should not be relied upon as part of dispatcher logic.
type BrokerConfig ¶
type BrokerConfig struct { Copies int `json:"copies" yaml:"copies"` Pattern string `json:"pattern" yaml:"pattern"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Outputs brokerOutputList `json:"outputs" yaml:"outputs"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
BrokerConfig contains configuration fields for the Broker output type.
func NewBrokerConfig ¶
func NewBrokerConfig() BrokerConfig
NewBrokerConfig creates a new BrokerConfig with default values.
type CassandraConfig ¶
type CassandraConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` TLS btls.Config `json:"tls" yaml:"tls"` PasswordAuthenticator PasswordAuthenticator `json:"password_authenticator" yaml:"password_authenticator"` DisableInitialHostLookup bool `json:"disable_initial_host_lookup" yaml:"disable_initial_host_lookup"` Query string `json:"query" yaml:"query"` Args []string `json:"args" yaml:"args"` ArgsMapping string `json:"args_mapping" yaml:"args_mapping"` Consistency string `json:"consistency" yaml:"consistency"` Timeout string `json:"timeout" yaml:"timeout"` // TODO: V4 Remove this and replace with explicit values. retries.Config `json:",inline" yaml:",inline"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
CassandraConfig contains configuration fields for the Cassandra output type.
func NewCassandraConfig ¶
func NewCassandraConfig() CassandraConfig
NewCassandraConfig creates a new CassandraConfig with default values.
type Config ¶
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` AMQP writer.AMQPConfig `json:"amqp" yaml:"amqp"` AMQP09 writer.AMQPConfig `json:"amqp_0_9" yaml:"amqp_0_9"` AMQP1 writer.AMQP1Config `json:"amqp_1" yaml:"amqp_1"` AWSDynamoDB writer.DynamoDBConfig `json:"aws_dynamodb" yaml:"aws_dynamodb"` AWSKinesis writer.KinesisConfig `json:"aws_kinesis" yaml:"aws_kinesis"` AWSKinesisFirehose writer.KinesisFirehoseConfig `json:"aws_kinesis_firehose" yaml:"aws_kinesis_firehose"` AWSS3 writer.AmazonS3Config `json:"aws_s3" yaml:"aws_s3"` AWSSNS writer.SNSConfig `json:"aws_sns" yaml:"aws_sns"` AWSSQS writer.AmazonSQSConfig `json:"aws_sqs" yaml:"aws_sqs"` AzureBlobStorage writer.AzureBlobStorageConfig `json:"azure_blob_storage" yaml:"azure_blob_storage"` AzureQueueStorage writer.AzureQueueStorageConfig `json:"azure_queue_storage" yaml:"azure_queue_storage"` AzureTableStorage writer.AzureTableStorageConfig `json:"azure_table_storage" yaml:"azure_table_storage"` BlobStorage writer.AzureBlobStorageConfig `json:"blob_storage" yaml:"blob_storage"` Broker BrokerConfig `json:"broker" yaml:"broker"` Cache writer.CacheConfig `json:"cache" yaml:"cache"` Cassandra CassandraConfig `json:"cassandra" yaml:"cassandra"` Drop writer.DropConfig `json:"drop" yaml:"drop"` DropOn DropOnConfig `json:"drop_on" yaml:"drop_on"` DropOnError DropOnErrorConfig `json:"drop_on_error" yaml:"drop_on_error"` Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"` DynamoDB writer.DynamoDBConfig `json:"dynamodb" yaml:"dynamodb"` Elasticsearch writer.ElasticsearchConfig `json:"elasticsearch" yaml:"elasticsearch"` Fallback TryConfig `json:"fallback" yaml:"fallback"` File FileConfig `json:"file" yaml:"file"` Files writer.FilesConfig `json:"files" yaml:"files"` GCPCloudStorage GCPCloudStorageConfig `json:"gcp_cloud_storage" yaml:"gcp_cloud_storage"` GCPPubSub writer.GCPPubSubConfig `json:"gcp_pubsub" yaml:"gcp_pubsub"` HDFS writer.HDFSConfig `json:"hdfs" yaml:"hdfs"` HTTPClient writer.HTTPClientConfig `json:"http_client" yaml:"http_client"` HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"` Inproc InprocConfig `json:"inproc" yaml:"inproc"` Kafka writer.KafkaConfig `json:"kafka" yaml:"kafka"` Kinesis writer.KinesisConfig `json:"kinesis" yaml:"kinesis"` KinesisFirehose writer.KinesisFirehoseConfig `json:"kinesis_firehose" yaml:"kinesis_firehose"` MongoDB MongoDBConfig `json:"mongodb" yaml:"mongodb"` MQTT writer.MQTTConfig `json:"mqtt" yaml:"mqtt"` Nanomsg writer.NanomsgConfig `json:"nanomsg" yaml:"nanomsg"` NATS writer.NATSConfig `json:"nats" yaml:"nats"` NATSJetStream NATSJetStreamConfig `json:"nats_jetstream" yaml:"nats_jetstream"` NATSStream writer.NATSStreamConfig `json:"nats_stream" yaml:"nats_stream"` NSQ writer.NSQConfig `json:"nsq" yaml:"nsq"` Plugin interface{} `json:"plugin,omitempty" yaml:"plugin,omitempty"` Pulsar PulsarConfig `json:"pulsar" yaml:"pulsar"` RedisHash writer.RedisHashConfig `json:"redis_hash" yaml:"redis_hash"` RedisList writer.RedisListConfig `json:"redis_list" yaml:"redis_list"` RedisPubSub writer.RedisPubSubConfig `json:"redis_pubsub" yaml:"redis_pubsub"` RedisStreams writer.RedisStreamsConfig `json:"redis_streams" yaml:"redis_streams"` Reject RejectConfig `json:"reject" yaml:"reject"` Resource string `json:"resource" yaml:"resource"` Retry RetryConfig `json:"retry" yaml:"retry"` S3 writer.AmazonS3Config `json:"s3" yaml:"s3"` SFTP SFTPConfig `json:"sftp" yaml:"sftp"` SNS writer.SNSConfig `json:"sns" yaml:"sns"` SQL SQLConfig `json:"sql" yaml:"sql"` SQS writer.AmazonSQSConfig `json:"sqs" yaml:"sqs"` STDOUT STDOUTConfig `json:"stdout" yaml:"stdout"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Switch SwitchConfig `json:"switch" yaml:"switch"` SyncResponse struct{} `json:"sync_response" yaml:"sync_response"` TableStorage writer.AzureTableStorageConfig `json:"table_storage" yaml:"table_storage"` TCP writer.TCPConfig `json:"tcp" yaml:"tcp"` Try TryConfig `json:"try" yaml:"try"` UDP writer.UDPConfig `json:"udp" yaml:"udp"` Socket writer.SocketConfig `json:"socket" yaml:"socket"` Websocket writer.WebsocketConfig `json:"websocket" yaml:"websocket"` ZMQ4 *writer.ZMQ4Config `json:"zmq4,omitempty" yaml:"zmq4,omitempty"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is the all encompassing configuration struct for all output types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
func NewConfig ¶
func NewConfig() Config
NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
func (Config) Sanitised ¶
Sanitised returns a sanitised version of the config, meaning sections that aren't relevant to behaviour are removed. Also optionally removes deprecated fields.
func (*Config) UnmarshalYAML ¶
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type ConstructorFunc ¶
type ConstructorFunc func(Config, types.Manager, log.Modular, metrics.Type, ...types.PipelineConstructorFunc) (Type, error)
ConstructorFunc is a func signature able to construct an output.
func GetDeprecatedPlugin ¶
func GetDeprecatedPlugin(name string) (ConstructorFunc, bool)
GetDeprecatedPlugin returns a constructor for an old plugin if it exists.
type DropOnConditions ¶
type DropOnConditions struct { Error bool `json:"error" yaml:"error"` BackPressure string `json:"back_pressure" yaml:"back_pressure"` }
DropOnConditions is a config struct representing the different circumstances under which messages should be dropped.
type DropOnConfig ¶
type DropOnConfig struct { DropOnConditions `json:",inline" yaml:",inline"` Output *Config `json:"output" yaml:"output"` }
DropOnConfig contains configuration values for the DropOn output type.
func NewDropOnConfig ¶
func NewDropOnConfig() DropOnConfig
NewDropOnConfig creates a new DropOnConfig with default values.
func (DropOnConfig) MarshalJSON ¶
func (d DropOnConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (DropOnConfig) MarshalYAML ¶
func (d DropOnConfig) MarshalYAML() (interface{}, error)
MarshalYAML prints an empty object instead of nil.
type DropOnError ¶
type DropOnError struct {
// contains filtered or unexported fields
}
DropOnError is an output type that continuously writes a message to a child output until the send is successful.
func (*DropOnError) CloseAsync ¶
func (d *DropOnError) CloseAsync()
CloseAsync shuts down the DropOnError input and stops processing requests.
func (*DropOnError) Connected ¶
func (d *DropOnError) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*DropOnError) Consume ¶
func (d *DropOnError) Consume(ts <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*DropOnError) MaxInFlight ¶
func (d *DropOnError) MaxInFlight() (int, bool)
MaxInFlight returns the maximum number of in flight messages permitted by the output. This value can be used to determine a sensible value for parent outputs, but should not be relied upon as part of dispatcher logic.
func (*DropOnError) WaitForClose ¶
func (d *DropOnError) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the DropOnError input has closed down.
type DropOnErrorConfig ¶
type DropOnErrorConfig struct {
*Config `yaml:",inline" json:",inline"`
}
DropOnErrorConfig contains configuration values for the DropOnError output type.
func NewDropOnErrorConfig ¶
func NewDropOnErrorConfig() DropOnErrorConfig
NewDropOnErrorConfig creates a new DropOnErrorConfig with default values.
func (DropOnErrorConfig) MarshalJSON ¶
func (d DropOnErrorConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (DropOnErrorConfig) MarshalYAML ¶
func (d DropOnErrorConfig) MarshalYAML() (interface{}, error)
MarshalYAML prints an empty object instead of nil.
func (*DropOnErrorConfig) UnmarshalJSON ¶
func (d *DropOnErrorConfig) UnmarshalJSON(bytes []byte) error
UnmarshalJSON ensures that when parsing child config it is initialised.
func (*DropOnErrorConfig) UnmarshalYAML ¶
func (d *DropOnErrorConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML ensures that when parsing child config it is initialised.
type DynamicConfig ¶
type DynamicConfig struct { Outputs map[string]Config `json:"outputs" yaml:"outputs"` Prefix string `json:"prefix" yaml:"prefix"` Timeout string `json:"timeout" yaml:"timeout"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
DynamicConfig contains configuration fields for the Dynamic output type.
func NewDynamicConfig ¶
func NewDynamicConfig() DynamicConfig
NewDynamicConfig creates a new DynamicConfig with default values.
type FileConfig ¶
type FileConfig struct { Path string `json:"path" yaml:"path"` Codec string `json:"codec" yaml:"codec"` Delim string `json:"delimiter" yaml:"delimiter"` }
FileConfig contains configuration fields for the file based output type.
func NewFileConfig ¶
func NewFileConfig() FileConfig
NewFileConfig creates a new FileConfig with default values.
type GCPCloudStorageConfig ¶
type GCPCloudStorageConfig struct { Bucket string `json:"bucket" yaml:"bucket"` Path string `json:"path" yaml:"path"` ContentType string `json:"content_type" yaml:"content_type"` ContentEncoding string `json:"content_encoding" yaml:"content_encoding"` ChunkSize int `json:"chunk_size" yaml:"chunk_size"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` CollisionMode string `json:"collision_mode" yaml:"collision_mode"` }
GCPCloudStorageConfig contains configuration fields for the GCP Cloud Storage output type.
func NewGCPCloudStorageConfig ¶
func NewGCPCloudStorageConfig() GCPCloudStorageConfig
NewGCPCloudStorageConfig creates a new Config with default values.
type HTTPServer ¶
type HTTPServer struct {
// contains filtered or unexported fields
}
HTTPServer is an output type that serves HTTPServer GET requests.
func (*HTTPServer) CloseAsync ¶
func (h *HTTPServer) CloseAsync()
CloseAsync shuts down the HTTPServer output and stops processing requests.
func (*HTTPServer) Connected ¶
func (h *HTTPServer) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*HTTPServer) Consume ¶
func (h *HTTPServer) Consume(ts <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*HTTPServer) WaitForClose ¶
func (h *HTTPServer) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the HTTPServer output has closed down.
type HTTPServerConfig ¶
type HTTPServerConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` StreamPath string `json:"stream_path" yaml:"stream_path"` WSPath string `json:"ws_path" yaml:"ws_path"` AllowedVerbs []string `json:"allowed_verbs" yaml:"allowed_verbs"` Timeout string `json:"timeout" yaml:"timeout"` CertFile string `json:"cert_file" yaml:"cert_file"` KeyFile string `json:"key_file" yaml:"key_file"` CORS httpdocs.ServerCORS `json:"cors" yaml:"cors"` }
HTTPServerConfig contains configuration fields for the HTTPServer output type.
func NewHTTPServerConfig ¶
func NewHTTPServerConfig() HTTPServerConfig
NewHTTPServerConfig creates a new HTTPServerConfig with default values.
type Inproc ¶
type Inproc struct {
// contains filtered or unexported fields
}
Inproc is an output type that serves Inproc messages.
func (*Inproc) CloseAsync ¶
func (i *Inproc) CloseAsync()
CloseAsync shuts down the Inproc output and stops processing messages.
func (*Inproc) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
type InprocConfig ¶
type InprocConfig string
InprocConfig contains configuration fields for the Inproc output type.
func NewInprocConfig ¶
func NewInprocConfig() InprocConfig
NewInprocConfig creates a new InprocConfig with default values.
type LineWriter ¶
type LineWriter struct {
// contains filtered or unexported fields
}
LineWriter is an output type that writes messages to an io.WriterCloser type as lines.
func (*LineWriter) CloseAsync ¶
func (w *LineWriter) CloseAsync()
CloseAsync shuts down the File output and stops processing messages.
func (*LineWriter) Connected ¶
func (w *LineWriter) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*LineWriter) Consume ¶
func (w *LineWriter) Consume(ts <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*LineWriter) WaitForClose ¶
func (w *LineWriter) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the File output has closed down.
type MongoDBConfig ¶
type MongoDBConfig struct { MongoConfig client.Config `json:",inline" yaml:",inline"` Operation string `json:"operation" yaml:"operation"` WriteConcern client.WriteConcern `json:"write_concern" yaml:"write_concern"` FilterMap string `json:"filter_map" yaml:"filter_map"` DocumentMap string `json:"document_map" yaml:"document_map"` HintMap string `json:"hint_map" yaml:"hint_map"` // DeleteEmptyValue bool `json:"delete_empty_value" yaml:"delete_empty_value"` Upsert bool `json:"upsert" yaml:"upsert"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` RetryConfig retries.Config `json:",inline" yaml:",inline"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
MongoDBConfig contains config fields for the MongoDB output type.
func NewMongoDBConfig ¶
func NewMongoDBConfig() MongoDBConfig
NewMongoDBConfig creates a MongoDB populated with default values.
type NATSJetStreamConfig ¶
type NATSJetStreamConfig struct { URLs []string `json:"urls" yaml:"urls"` Subject string `json:"subject" yaml:"subject"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` TLS tls.Config `json:"tls" yaml:"tls"` Auth auth.Config `json:"auth" yaml:"auth"` }
NATSJetStreamConfig contains configuration fields for the NATS Jetstream input type.
func NewNATSJetStreamConfig ¶
func NewNATSJetStreamConfig() NATSJetStreamConfig
NewNATSJetStreamConfig creates a new NATSJetstreamConfig with default values.
type PasswordAuthenticator ¶
type PasswordAuthenticator struct { Enabled bool `json:"enabled" yaml:"enabled"` Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` }
PasswordAuthenticator contains the fields that will be used to authenticate with the Cassandra cluster.
type PluginConfigConstructor ¶
type PluginConfigConstructor func() interface{}
PluginConfigConstructor is a func that returns a pointer to a new and fully populated configuration struct for a plugin type.
type PluginConfigSanitiser ¶
type PluginConfigSanitiser func(conf interface{}) interface{}
PluginConfigSanitiser is a function that takes a configuration object for a plugin and returns a sanitised (minimal) version of it for printing in examples and plugin documentation.
This function is useful for when a plugins configuration struct is very large and complex, but can sometimes be expressed in a more concise way without losing the original intent.
type PluginConstructor ¶
type PluginConstructor func( config interface{}, manager types.Manager, logger log.Modular, metrics metrics.Type, ) (types.Output, error)
PluginConstructor is a func that constructs a Benthos output plugin. These are plugins that are specific to certain use cases, experimental, private or otherwise unfit for widespread general use. Any number of plugins can be specified when using Benthos as a framework.
The configuration object will be the result of the PluginConfigConstructor after overlaying the user configuration.
type PulsarConfig ¶
type PulsarConfig struct { URL string `json:"url" yaml:"url"` Topic string `json:"topic" yaml:"topic"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Key string `json:"key" yaml:"key"` OrderingKey string `json:"ordering_key" yaml:"ordering_key"` Auth auth.Config `json:"auth" yaml:"auth"` }
PulsarConfig contains configuration for the Pulsar input type.
func NewPulsarConfig ¶
func NewPulsarConfig() PulsarConfig
NewPulsarConfig creates a new PulsarConfig with default values.
type RejectConfig ¶
type RejectConfig string
RejectConfig contains configuration fields for the file based output type.
func NewRejectConfig ¶
func NewRejectConfig() RejectConfig
NewRejectConfig creates a new RejectConfig with default values.
type Resource ¶
type Resource struct {
// contains filtered or unexported fields
}
Resource is a processor that returns the result of a output resource.
func (*Resource) CloseAsync ¶
func (r *Resource) CloseAsync()
CloseAsync shuts down the output and stops processing requests.
func (*Resource) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
type Retry ¶
type Retry struct {
// contains filtered or unexported fields
}
Retry is an output type that continuously writes a message to a child output until the send is successful.
func (*Retry) CloseAsync ¶
func (r *Retry) CloseAsync()
CloseAsync shuts down the Retry input and stops processing requests.
func (*Retry) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*Retry) Consume ¶
func (r *Retry) Consume(ts <-chan types.Transaction) error
Consume assigns a messages channel for the output to read.
func (*Retry) MaxInFlight ¶
MaxInFlight returns the maximum number of in flight messages permitted by the output. This value can be used to determine a sensible value for parent outputs, but should not be relied upon as part of dispatcher logic.
type RetryConfig ¶
type RetryConfig struct { Output *Config `json:"output" yaml:"output"` retries.Config `json:",inline" yaml:",inline"` }
RetryConfig contains configuration values for the Retry output type.
func NewRetryConfig ¶
func NewRetryConfig() RetryConfig
NewRetryConfig creates a new RetryConfig with default values.
func (RetryConfig) MarshalJSON ¶
func (r RetryConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (RetryConfig) MarshalYAML ¶
func (r RetryConfig) MarshalYAML() (interface{}, error)
MarshalYAML prints an empty object instead of nil.
type SFTPConfig ¶
type SFTPConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` Codec string `json:"codec" yaml:"codec"` Credentials sftpSetup.Credentials `json:"credentials" yaml:"credentials"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
SFTPConfig contains configuration fields for the SFTP output type.
func NewSFTPConfig ¶
func NewSFTPConfig() SFTPConfig
NewSFTPConfig creates a new Config with default values.
type SQLConfig ¶
type SQLConfig struct { Driver string `json:"driver" yaml:"driver"` DataSourceName string `json:"data_source_name" yaml:"data_source_name"` Query string `json:"query" yaml:"query"` Args []string `json:"args" yaml:"args"` ArgsMapping string `json:"args_mapping" yaml:"args_mapping"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
SQLConfig contains configuration fields for the SQL processor.
func NewSQLConfig ¶
func NewSQLConfig() SQLConfig
NewSQLConfig returns a SQLConfig with default values.
type STDOUTConfig ¶
type STDOUTConfig struct { Codec string `json:"codec" yaml:"codec"` Delim string `json:"delimiter" yaml:"delimiter"` }
STDOUTConfig contains configuration fields for the stdout based output type.
func NewSTDOUTConfig ¶
func NewSTDOUTConfig() STDOUTConfig
NewSTDOUTConfig creates a new STDOUTConfig with default values.
type Subprocess ¶
type Subprocess struct {
// contains filtered or unexported fields
}
Subprocess executes a bloblang mapping with an empty context each time this input is read from. An interval period must be specified that determines how often a message is generated.
func (*Subprocess) CloseAsync ¶
func (s *Subprocess) CloseAsync()
CloseAsync shuts down the bloblang reader.
func (*Subprocess) ConnectWithContext ¶
func (s *Subprocess) ConnectWithContext(ctx context.Context) error
ConnectWithContext establishes a Subprocess reader.
func (*Subprocess) WaitForClose ¶
func (s *Subprocess) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the bloblang input has closed down.
func (*Subprocess) WriteWithContext ¶
WriteWithContext attempts to write message contents to a directory as files.
type SubprocessConfig ¶
type SubprocessConfig struct { Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` Codec string `json:"codec" yaml:"codec"` }
SubprocessConfig contains configuration for the Subprocess input type.
func NewSubprocessConfig ¶
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig creates a new SubprocessConfig with default values.
type Switch ¶
type Switch struct {
// contains filtered or unexported fields
}
Switch is a broker that implements types.Consumer and broadcasts each message out to an array of outputs.
func (*Switch) CloseAsync ¶
func (o *Switch) CloseAsync()
CloseAsync shuts down the Switch broker and stops processing requests.
func (*Switch) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*Switch) Consume ¶
func (o *Switch) Consume(transactions <-chan types.Transaction) error
Consume assigns a new transactions channel for the broker to read.
func (*Switch) MaxInFlight ¶
MaxInFlight returns the maximum number of in flight messages permitted by the output. This value can be used to determine a sensible value for parent outputs, but should not be relied upon as part of dispatcher logic.
type SwitchConfig ¶
type SwitchConfig struct { RetryUntilSuccess bool `json:"retry_until_success" yaml:"retry_until_success"` StrictMode bool `json:"strict_mode" yaml:"strict_mode"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Cases []SwitchConfigCase `json:"cases" yaml:"cases"` Outputs []SwitchConfigOutput `json:"outputs" yaml:"outputs"` }
SwitchConfig contains configuration fields for the Switch output type.
func NewSwitchConfig ¶
func NewSwitchConfig() SwitchConfig
NewSwitchConfig creates a new SwitchConfig with default values.
type SwitchConfigCase ¶
type SwitchConfigCase struct { Check string `json:"check" yaml:"check"` Continue bool `json:"continue" yaml:"continue"` Output Config `json:"output" yaml:"output"` }
SwitchConfigCase contains configuration fields per output of a switch type.
func NewSwitchConfigCase ¶
func NewSwitchConfigCase() SwitchConfigCase
NewSwitchConfigCase creates a new switch output config with default values.
type SwitchConfigOutput ¶
type SwitchConfigOutput struct { Condition condition.Config `json:"condition" yaml:"condition"` Fallthrough bool `json:"fallthrough" yaml:"fallthrough"` Output Config `json:"output" yaml:"output"` }
SwitchConfigOutput contains configuration fields per output of a switch type.
func NewSwitchConfigOutput ¶
func NewSwitchConfigOutput() SwitchConfigOutput
NewSwitchConfigOutput creates a new switch output config with default values.
func (*SwitchConfigOutput) UnmarshalJSON ¶
func (s *SwitchConfigOutput) UnmarshalJSON(bytes []byte) error
UnmarshalJSON ensures that when parsing configs that are in a map or slice the default values are still applied.
func (*SwitchConfigOutput) UnmarshalYAML ¶
func (s *SwitchConfigOutput) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type TryConfig ¶
type TryConfig brokerOutputList
TryConfig contains configuration fields for the Try output type.
func NewTryConfig ¶
func NewTryConfig() TryConfig
NewTryConfig creates a new BrokerConfig with default values.
type Type ¶
type Type interface { types.Closable types.Consumer // Connected returns a boolean indicating whether this output is currently // connected to its target. Connected() bool }
Type is the standard interface of an output type.
func New ¶
func New( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, pipelines ...types.PipelineConstructorFunc, ) (Type, error)
New creates an output type based on an output configuration.
func NewAWSDynamoDB ¶
func NewAWSDynamoDB(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAWSDynamoDB creates a new DynamoDB output type.
func NewAWSKinesis ¶
func NewAWSKinesis(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAWSKinesis creates a new Kinesis output type.
func NewAWSKinesisFirehose ¶
func NewAWSKinesisFirehose(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAWSKinesisFirehose creates a new KinesisFirehose output type.
func NewAmazonS3 ¶
NewAmazonS3 creates a new AmazonS3 output type.
func NewAmazonSNS ¶
func NewAmazonSNS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAmazonSNS creates a new AmazonSNS output type.
func NewAmazonSQS ¶
func NewAmazonSQS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAmazonSQS creates a new AmazonSQS output type.
func NewAsyncWriter ¶
func NewAsyncWriter( typeStr string, maxInflight int, w AsyncSink, log log.Modular, stats metrics.Type, ) (Type, error)
NewAsyncWriter creates a new AsyncWriter output type. Deprecated
func NewAzureBlobStorage ¶
func NewAzureBlobStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAzureBlobStorage creates a new AzureBlobStorage output type.
func NewAzureQueueStorage ¶
func NewAzureQueueStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAzureQueueStorage creates a new AzureQueueStorage output type.
func NewAzureTableStorage ¶
func NewAzureTableStorage(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewAzureTableStorage creates a new NewAzureTableStorage output type.
func NewBatcher ¶
NewBatcher creates a new output preceded by a batching mechanism that enforces a given batching policy.
func NewBatcherFromConfig ¶
func NewBatcherFromConfig( conf batch.PolicyConfig, child Type, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewBatcherFromConfig creates a new output preceded by a batching mechanism that enforces a given batching policy configuration.
func NewBroker ¶
func NewBroker( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, pipelines ...types.PipelineConstructorFunc, ) (Type, error)
NewBroker creates a new Broker output type. Messages will be sent out to the list of outputs according to the chosen broker pattern.
func NewDropOnError ¶
func NewDropOnError( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewDropOnError creates a new DropOnError input type.
func NewDynamic ¶
func NewDynamic( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewDynamic creates a new Dynamic output type.
func NewDynamoDB ¶
NewDynamoDB creates a new DynamoDB output type.
func NewElasticsearch ¶
func NewElasticsearch(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewElasticsearch creates a new Elasticsearch output type.
func NewGCPPubSub ¶
func NewGCPPubSub(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewGCPPubSub creates a new GCPPubSub output type.
func NewHTTPClient ¶
func NewHTTPClient(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewHTTPClient creates a new HTTPClient output type.
func NewHTTPServer ¶
func NewHTTPServer(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewHTTPServer creates a new HTTPServer output type.
func NewKinesis ¶
NewKinesis creates a new Kinesis output type.
func NewKinesisFirehose ¶
func NewKinesisFirehose(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewKinesisFirehose creates a new KinesisFirehose output type.
func NewLineWriter ¶
func NewLineWriter( handle io.WriteCloser, closeOnExit bool, customDelimiter []byte, typeStr string, log log.Modular, stats metrics.Type, ) (Type, error)
NewLineWriter creates a new LineWriter output type.
func NewNATSStream ¶
func NewNATSStream(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewNATSStream creates a new NATSStream output type.
func NewNanomsg ¶
NewNanomsg creates a new Nanomsg output type.
func NewRedisHash ¶
func NewRedisHash(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewRedisHash creates a new RedisHash output type.
func NewRedisList ¶
func NewRedisList(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewRedisList creates a new RedisList output type.
func NewRedisPubSub ¶
func NewRedisPubSub(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewRedisPubSub creates a new RedisPubSub output type.
func NewRedisStreams ¶
func NewRedisStreams(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewRedisStreams creates a new RedisStreams output type.
func NewResource ¶
func NewResource( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (Type, error)
NewResource returns a resource output.
func NewSwitch ¶
func NewSwitch( conf Config, mgr types.Manager, logger log.Modular, stats metrics.Type, ) (Type, error)
NewSwitch creates a new Switch type by providing outputs. Messages will be sent to a subset of outputs according to condition and fallthrough settings.
func NewTry ¶
func NewTry( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, pipelines ...types.PipelineConstructorFunc, ) (Type, error)
NewTry creates a new try broker output type.
func NewWebsocket ¶
func NewWebsocket(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error)
NewWebsocket creates a new Websocket output type.
func OnlySinglePayloads ¶
OnlySinglePayloads expands message batches into individual payloads, respecting the max in flight of the wrapped output. This is a more efficient way of feeding messages into an output that handles its own batching mechanism internally, or does not support batching at all.
func WrapWithPipelines ¶
func WrapWithPipelines(out Type, pipeConstructors ...types.PipelineConstructorFunc) (Type, error)
WrapWithPipelines wraps an output with a variadic number of pipelines.
type TypeSpec ¶
type TypeSpec struct { // Async indicates whether this output benefits from sending multiple // messages asynchronously over the protocol. Async bool // Batches indicates whether this output benefits from batching of messages. Batches bool Status docs.Status Summary string Description string Categories []Category Footnotes string FieldSpecs docs.FieldSpecs Examples []docs.AnnotatedExample Version string // contains filtered or unexported fields }
TypeSpec is a constructor and a usage description for each output type.
type WithPipeline ¶
type WithPipeline struct {
// contains filtered or unexported fields
}
WithPipeline is a type that wraps both an output type and a pipeline type by routing the pipeline through the output, and implements the output.Type interface in order to act like an ordinary output.
func WrapWithPipeline ¶
func WrapWithPipeline(i *int, out Type, pipeConstructor types.PipelineConstructorFunc) (*WithPipeline, error)
WrapWithPipeline routes a processing pipeline directly into an output and returns a type that manages both and acts like an ordinary output.
func (*WithPipeline) CloseAsync ¶
func (i *WithPipeline) CloseAsync()
CloseAsync triggers a closure of this object but does not block.
func (*WithPipeline) Connected ¶
func (i *WithPipeline) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*WithPipeline) Consume ¶
func (i *WithPipeline) Consume(tsChan <-chan types.Transaction) error
Consume starts the type listening to a message channel from a producer.
func (*WithPipeline) MaxInFlight ¶
func (i *WithPipeline) MaxInFlight() (int, bool)
MaxInFlight returns the maximum number of in flight messages permitted by the output. This value can be used to determine a sensible value for parent outputs, but should not be relied upon as part of dispatcher logic.
func (*WithPipeline) WaitForClose ¶
func (i *WithPipeline) WaitForClose(timeout time.Duration) error
WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer is an output type that writes messages to a writer.Type.
func (*Writer) CloseAsync ¶
func (w *Writer) CloseAsync()
CloseAsync shuts down the File output and stops processing messages.
func (*Writer) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
Source Files ¶
- amqp.go
- amqp_0_9.go
- amqp_1.go
- async_writer.go
- aws_dynamodb.go
- aws_kinesis.go
- aws_kinesis_firehose.go
- aws_s3.go
- aws_sns.go
- aws_sqs.go
- azure_blob_storage.go
- azure_queue_storage.go
- azure_table_storage.go
- batcher.go
- broker.go
- broker_out_common.go
- cache.go
- cassandra.go
- constructor.go
- docs.go
- drop.go
- drop_on.go
- drop_on_error.go
- dynamic.go
- elasticsearch.go
- fallback.go
- file.go
- files.go
- gcp_cloud_storage.go
- gcp_pubsub.go
- hdfs.go
- http_client.go
- http_server.go
- inproc.go
- interface.go
- kafka.go
- line_writer.go
- mongodb.go
- mqtt.go
- nanomsg.go
- nats.go
- nats_jetstream.go
- nats_stream.go
- not_batched.go
- nsq.go
- package.go
- plugin.go
- pulsar.go
- redis_hash.go
- redis_list.go
- redis_pubsub.go
- redis_streams.go
- reject.go
- resource.go
- retry.go
- sftp.go
- socket.go
- sql.go
- sql_extra.go
- stdout.go
- subprocess.go
- switch.go
- switch_deprecated.go
- sync_response.go
- tcp.go
- try.go
- udp.go
- websocket.go
- wrap_with_pipeline.go
- writer.go