Documentation ¶
Overview ¶
Package output defines all sinks for sending Benthos messages to a variety of third party destinations. All output types must implement interface output.Type.
If the sink of an output provides a form of acknowledgement of message receipt then the output is responsible for propagating that acknowledgement back to the source of the data by sending it over the transaction response channel. Otherwise a standard acknowledgement is sent.
Index ¶
- Constants
- Variables
- func AppendProcessorsFromConfig(conf Config, mgr interop.Manager, ...) []iprocessor.PipelineConstructorFunc
- func New(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type, ...) (output.Streamed, error)
- func NewAMQP09(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewAMQP1(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewAsyncWriter(typeStr string, maxInflight int, w AsyncSink, log log.Modular, ...) (output.Streamed, error)
- func NewAzureBlobStorage(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewAzureQueueStorage(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewAzureTableStorage(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewBatcher(batcher *policy.Batcher, child output.Streamed, log log.Modular, ...) output.Streamed
- func NewBatcherFromConfig(conf policy.Config, child output.Streamed, mgr interop.Manager, ...) (output.Streamed, error)
- func NewCache(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewDrop(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewElasticsearch(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewFile(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewGCPPubSub(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewHDFS(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewHTTPClient(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewHTTPServer(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewInproc(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewKafka(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewMQTT(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewNATS(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewNATSStream(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewNSQ(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewNanomsg(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewRedisHash(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewRedisList(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewRedisPubSub(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewRedisStreams(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewResource(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewSTDOUT(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewSocket(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func NewWebsocket(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
- func OnlySinglePayloads(out output.Streamed) output.Streamed
- func WalkConstructors(fn func(ConstructorFunc, docs.ComponentSpec))
- func WrapWithPipelines(out output.Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (output.Streamed, error)
- type AsyncSink
- type AsyncWriter
- func (w *AsyncWriter) CloseAsync()
- func (w *AsyncWriter) Connected() bool
- func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error
- func (w *AsyncWriter) SetInjectTracingMap(mapping string) error
- func (w *AsyncWriter) SetNoCancel()
- func (w *AsyncWriter) WaitForClose(timeout time.Duration) error
- type Batcher
- type BrokerConfig
- type CassandraConfig
- type Category
- type Config
- type ConstructorFunc
- type DropOnConditions
- type DropOnConfig
- type DynamicConfig
- type FileConfig
- type GCPCloudStorageConfig
- type HTTPServer
- type HTTPServerConfig
- type Inproc
- type InprocConfig
- type MongoDBConfig
- type PasswordAuthenticator
- type PulsarConfig
- type RejectConfig
- type Resource
- type RetryConfig
- type SFTPConfig
- type STDOUTConfig
- type Subprocess
- type SubprocessConfig
- type SwitchConfig
- type SwitchConfigCase
- type SyncResponseWriter
- type TryConfig
- type TypeSpec
- type WithPipeline
Constants ¶
const ( TypeAMQP09 = "amqp_0_9" TypeAMQP1 = "amqp_1" TypeAWSDynamoDB = "aws_dynamodb" TypeAWSKinesis = "aws_kinesis" TypeAWSKinesisFirehose = "aws_kinesis_firehose" TypeAWSS3 = "aws_s3" TypeAWSSNS = "aws_sns" TypeAWSSQS = "aws_sqs" TypeAzureBlobStorage = "azure_blob_storage" TypeAzureQueueStorage = "azure_queue_storage" TypeAzureTableStorage = "azure_table_storage" TypeBroker = "broker" TypeCache = "cache" TypeCassandra = "cassandra" TypeDrop = "drop" TypeDropOn = "drop_on" TypeDynamic = "dynamic" TypeDynamoDB = "dynamodb" TypeElasticsearch = "elasticsearch" TypeFallback = "fallback" TypeFile = "file" TypeGCPCloudStorage = "gcp_cloud_storage" TypeGCPPubSub = "gcp_pubsub" TypeHDFS = "hdfs" TypeHTTPClient = "http_client" TypeHTTPServer = "http_server" TypeInproc = "inproc" TypeKafka = "kafka" TypeMongoDB = "mongodb" TypeMQTT = "mqtt" TypeNanomsg = "nanomsg" TypeNATS = "nats" TypeNATSJetStream = "nats_jetstream" TypeNATSStream = "nats_stream" TypeNSQ = "nsq" TypePulsar = "pulsar" TypeRedisHash = "redis_hash" TypeRedisList = "redis_list" TypeRedisPubSub = "redis_pubsub" TypeRedisStreams = "redis_streams" TypeReject = "reject" TypeResource = "resource" TypeRetry = "retry" TypeSFTP = "sftp" TypeSTDOUT = "stdout" TypeSubprocess = "subprocess" TypeSwitch = "switch" TypeSyncResponse = "sync_response" TypeSocket = "socket" TypeWebsocket = "websocket" )
String constants representing each output type. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
const ( // GCPCloudStorageErrorIfExistsCollisionMode - error-if-exists GCPCloudStorageErrorIfExistsCollisionMode = "error-if-exists" // GCPCloudStorageAppendCollisionMode - append GCPCloudStorageAppendCollisionMode = "append" // GCPCloudStorageIgnoreCollisionMode - ignore GCPCloudStorageIgnoreCollisionMode = "ignore" // GCPCloudStorageOverwriteCollisionMode - overwrite GCPCloudStorageOverwriteCollisionMode = "overwrite" )
Variables ¶
var Constructors = map[string]TypeSpec{}
Constructors is a map of all output types with their specs.
Functions ¶
func AppendProcessorsFromConfig ¶
func AppendProcessorsFromConfig(conf Config, mgr interop.Manager, pipelines ...iprocessor.PipelineConstructorFunc) []iprocessor.PipelineConstructorFunc
AppendProcessorsFromConfig takes a variant arg of pipeline constructor functions and returns a new slice of them where the processors of the provided output configuration will also be initialized.
func New ¶
func New( conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type, pipelines ...iprocessor.PipelineConstructorFunc, ) (output.Streamed, error)
New creates an output type based on an output configuration.
func NewAMQP09 ¶
func NewAMQP09(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewAMQP09 creates a new AMQP output type.
func NewAMQP1 ¶
func NewAMQP1(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewAMQP1 creates a new AMQP output type.
func NewAsyncWriter ¶
func NewAsyncWriter( typeStr string, maxInflight int, w AsyncSink, log log.Modular, stats metrics.Type, ) (output.Streamed, error)
NewAsyncWriter creates a new AsyncWriter output type. Deprecated
func NewAzureBlobStorage ¶
func NewAzureBlobStorage(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewAzureBlobStorage creates a new AzureBlobStorage output type.
func NewAzureQueueStorage ¶
func NewAzureQueueStorage(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewAzureQueueStorage creates a new AzureQueueStorage output type.
func NewAzureTableStorage ¶
func NewAzureTableStorage(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewAzureTableStorage creates a new NewAzureTableStorage output type.
func NewBatcher ¶
func NewBatcher( batcher *policy.Batcher, child output.Streamed, log log.Modular, stats metrics.Type, ) output.Streamed
NewBatcher creates a new output preceded by a batching mechanism that enforces a given batching policy.
func NewBatcherFromConfig ¶
func NewBatcherFromConfig( conf policy.Config, child output.Streamed, mgr interop.Manager, log log.Modular, stats metrics.Type, ) (output.Streamed, error)
NewBatcherFromConfig creates a new output preceded by a batching mechanism that enforces a given batching policy configuration.
func NewCache ¶
func NewCache(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewCache creates a new Cache output type.
func NewDrop ¶
func NewDrop(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewDrop creates a new Drop output type.
func NewElasticsearch ¶
func NewElasticsearch(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewElasticsearch creates a new Elasticsearch output type.
func NewFile ¶
func NewFile(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewFile creates a new File output type.
func NewGCPPubSub ¶
func NewGCPPubSub(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewGCPPubSub creates a new GCPPubSub output type.
func NewHDFS ¶
func NewHDFS(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewHDFS creates a new HDFS output type.
func NewHTTPClient ¶
func NewHTTPClient(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewHTTPClient creates a new HTTPClient output type.
func NewHTTPServer ¶
func NewHTTPServer(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewHTTPServer creates a new HTTPServer output type.
func NewInproc ¶
func NewInproc(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewInproc creates a new Inproc output type.
func NewKafka ¶
func NewKafka(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewKafka creates a new Kafka output type.
func NewMQTT ¶
func NewMQTT(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewMQTT creates a new MQTT output type.
func NewNATS ¶
func NewNATS(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewNATS creates a new NATS output type.
func NewNATSStream ¶
func NewNATSStream(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewNATSStream creates a new NATSStream output type.
func NewNSQ ¶
func NewNSQ(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewNSQ creates a new NSQ output type.
func NewNanomsg ¶
func NewNanomsg(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewNanomsg creates a new Nanomsg output type.
func NewRedisHash ¶
func NewRedisHash(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewRedisHash creates a new RedisHash output type.
func NewRedisList ¶
func NewRedisList(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewRedisList creates a new RedisList output type.
func NewRedisPubSub ¶
func NewRedisPubSub(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewRedisPubSub creates a new RedisPubSub output type.
func NewRedisStreams ¶
func NewRedisStreams(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewRedisStreams creates a new RedisStreams output type.
func NewResource ¶
func NewResource( conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type, ) (output.Streamed, error)
NewResource returns a resource output.
func NewSTDOUT ¶
func NewSTDOUT(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewSTDOUT creates a new STDOUT output type.
func NewSocket ¶
func NewSocket(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewSocket creates a new Socket output type.
func NewWebsocket ¶
func NewWebsocket(conf Config, mgr interop.Manager, log log.Modular, stats metrics.Type) (output.Streamed, error)
NewWebsocket creates a new Websocket output type.
func OnlySinglePayloads ¶
OnlySinglePayloads expands message batches into individual payloads, respecting the max in flight of the wrapped output. This is a more efficient way of feeding messages into an output that handles its own batching mechanism internally, or does not support batching at all.
func WalkConstructors ¶
func WalkConstructors(fn func(ConstructorFunc, docs.ComponentSpec))
WalkConstructors iterates each component constructor.
func WrapWithPipelines ¶
func WrapWithPipelines(out output.Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (output.Streamed, error)
WrapWithPipelines wraps an output with a variadic number of pipelines.
Types ¶
type AsyncSink ¶
type AsyncSink interface { // ConnectWithContext attempts to establish a connection to the sink, if // unsuccessful returns an error. If the attempt is successful (or not // necessary) returns nil. ConnectWithContext(ctx context.Context) error // WriteWithContext should block until either the message is sent (and // acknowledged) to a sink, or a transport specific error has occurred, or // the Type is closed. WriteWithContext(ctx context.Context, msg *message.Batch) error // CloseAsync triggers the shut down of this component but should not block // the calling goroutine. CloseAsync() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(timeout time.Duration) error }
AsyncSink is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.
type AsyncWriter ¶
type AsyncWriter struct {
// contains filtered or unexported fields
}
AsyncWriter is an output type that writes messages to a writer.Type.
func (*AsyncWriter) CloseAsync ¶
func (w *AsyncWriter) CloseAsync()
CloseAsync shuts down the File output and stops processing messages.
func (*AsyncWriter) Connected ¶
func (w *AsyncWriter) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*AsyncWriter) Consume ¶
func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error
Consume assigns a messages channel for the output to read.
func (*AsyncWriter) SetInjectTracingMap ¶
func (w *AsyncWriter) SetInjectTracingMap(mapping string) error
SetInjectTracingMap sets a mapping to be used for injecting tracing events into messages.
func (*AsyncWriter) SetNoCancel ¶
func (w *AsyncWriter) SetNoCancel()
SetNoCancel configures the async writer so that write calls do not use a context that gets cancelled on shutdown. This is much more efficient as it reduces allocations, goroutines and defers for each write call, but also means the write can block graceful termination. Therefore this setting should be reserved for outputs that are exceptionally fast.
func (*AsyncWriter) WaitForClose ¶
func (w *AsyncWriter) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the File output has closed down.
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
Batcher wraps an output with a batching policy.
func (*Batcher) CloseAsync ¶
func (m *Batcher) CloseAsync()
CloseAsync shuts down the Batcher and stops processing messages.
func (*Batcher) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
type BrokerConfig ¶
type BrokerConfig struct { Copies int `json:"copies" yaml:"copies"` Pattern string `json:"pattern" yaml:"pattern"` Outputs []Config `json:"outputs" yaml:"outputs"` Batching policy.Config `json:"batching" yaml:"batching"` }
BrokerConfig contains configuration fields for the Broker output type.
func NewBrokerConfig ¶
func NewBrokerConfig() BrokerConfig
NewBrokerConfig creates a new BrokerConfig with default values.
type CassandraConfig ¶
type CassandraConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` TLS btls.Config `json:"tls" yaml:"tls"` PasswordAuthenticator PasswordAuthenticator `json:"password_authenticator" yaml:"password_authenticator"` DisableInitialHostLookup bool `json:"disable_initial_host_lookup" yaml:"disable_initial_host_lookup"` Query string `json:"query" yaml:"query"` ArgsMapping string `json:"args_mapping" yaml:"args_mapping"` Consistency string `json:"consistency" yaml:"consistency"` Timeout string `json:"timeout" yaml:"timeout"` // TODO: V4 Remove this and replace with explicit values. retries.Config `json:",inline" yaml:",inline"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching policy.Config `json:"batching" yaml:"batching"` }
CassandraConfig contains configuration fields for the Cassandra output type.
func NewCassandraConfig ¶
func NewCassandraConfig() CassandraConfig
NewCassandraConfig creates a new CassandraConfig with default values.
type Config ¶
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` AMQP09 writer.AMQPConfig `json:"amqp_0_9" yaml:"amqp_0_9"` AMQP1 writer.AMQP1Config `json:"amqp_1" yaml:"amqp_1"` AWSDynamoDB writer.DynamoDBConfig `json:"aws_dynamodb" yaml:"aws_dynamodb"` AWSKinesis writer.KinesisConfig `json:"aws_kinesis" yaml:"aws_kinesis"` AWSKinesisFirehose writer.KinesisFirehoseConfig `json:"aws_kinesis_firehose" yaml:"aws_kinesis_firehose"` AWSS3 writer.AmazonS3Config `json:"aws_s3" yaml:"aws_s3"` AWSSNS writer.SNSConfig `json:"aws_sns" yaml:"aws_sns"` AWSSQS writer.AmazonSQSConfig `json:"aws_sqs" yaml:"aws_sqs"` AzureBlobStorage writer.AzureBlobStorageConfig `json:"azure_blob_storage" yaml:"azure_blob_storage"` AzureQueueStorage writer.AzureQueueStorageConfig `json:"azure_queue_storage" yaml:"azure_queue_storage"` AzureTableStorage writer.AzureTableStorageConfig `json:"azure_table_storage" yaml:"azure_table_storage"` Broker BrokerConfig `json:"broker" yaml:"broker"` Cache writer.CacheConfig `json:"cache" yaml:"cache"` Cassandra CassandraConfig `json:"cassandra" yaml:"cassandra"` Drop writer.DropConfig `json:"drop" yaml:"drop"` DropOn DropOnConfig `json:"drop_on" yaml:"drop_on"` Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"` Elasticsearch writer.ElasticsearchConfig `json:"elasticsearch" yaml:"elasticsearch"` Fallback TryConfig `json:"fallback" yaml:"fallback"` File FileConfig `json:"file" yaml:"file"` GCPCloudStorage GCPCloudStorageConfig `json:"gcp_cloud_storage" yaml:"gcp_cloud_storage"` GCPPubSub writer.GCPPubSubConfig `json:"gcp_pubsub" yaml:"gcp_pubsub"` HDFS writer.HDFSConfig `json:"hdfs" yaml:"hdfs"` HTTPClient writer.HTTPClientConfig `json:"http_client" yaml:"http_client"` HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"` Inproc InprocConfig `json:"inproc" yaml:"inproc"` Kafka writer.KafkaConfig `json:"kafka" yaml:"kafka"` MongoDB MongoDBConfig `json:"mongodb" yaml:"mongodb"` MQTT writer.MQTTConfig `json:"mqtt" yaml:"mqtt"` Nanomsg writer.NanomsgConfig `json:"nanomsg" yaml:"nanomsg"` NATS writer.NATSConfig `json:"nats" yaml:"nats"` NATSStream writer.NATSStreamConfig `json:"nats_stream" yaml:"nats_stream"` NSQ writer.NSQConfig `json:"nsq" yaml:"nsq"` Plugin interface{} `json:"plugin,omitempty" yaml:"plugin,omitempty"` Pulsar PulsarConfig `json:"pulsar" yaml:"pulsar"` RedisHash writer.RedisHashConfig `json:"redis_hash" yaml:"redis_hash"` RedisList writer.RedisListConfig `json:"redis_list" yaml:"redis_list"` RedisPubSub writer.RedisPubSubConfig `json:"redis_pubsub" yaml:"redis_pubsub"` RedisStreams writer.RedisStreamsConfig `json:"redis_streams" yaml:"redis_streams"` Reject RejectConfig `json:"reject" yaml:"reject"` Resource string `json:"resource" yaml:"resource"` Retry RetryConfig `json:"retry" yaml:"retry"` SFTP SFTPConfig `json:"sftp" yaml:"sftp"` STDOUT STDOUTConfig `json:"stdout" yaml:"stdout"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Switch SwitchConfig `json:"switch" yaml:"switch"` SyncResponse struct{} `json:"sync_response" yaml:"sync_response"` Socket writer.SocketConfig `json:"socket" yaml:"socket"` Websocket writer.WebsocketConfig `json:"websocket" yaml:"websocket"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is the all encompassing configuration struct for all output types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
func NewConfig ¶
func NewConfig() Config
NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
func (*Config) UnmarshalYAML ¶
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type ConstructorFunc ¶
type ConstructorFunc func(Config, interop.Manager, log.Modular, metrics.Type, ...iprocessor.PipelineConstructorFunc) (output.Streamed, error)
ConstructorFunc is a func signature able to construct an output.
type DropOnConditions ¶
type DropOnConditions struct { Error bool `json:"error" yaml:"error"` BackPressure string `json:"back_pressure" yaml:"back_pressure"` }
DropOnConditions is a config struct representing the different circumstances under which messages should be dropped.
type DropOnConfig ¶
type DropOnConfig struct { DropOnConditions `json:",inline" yaml:",inline"` Output *Config `json:"output" yaml:"output"` }
DropOnConfig contains configuration values for the DropOn output type.
func NewDropOnConfig ¶
func NewDropOnConfig() DropOnConfig
NewDropOnConfig creates a new DropOnConfig with default values.
func (DropOnConfig) MarshalJSON ¶
func (d DropOnConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (DropOnConfig) MarshalYAML ¶
func (d DropOnConfig) MarshalYAML() (interface{}, error)
MarshalYAML prints an empty object instead of nil.
type DynamicConfig ¶
type DynamicConfig struct { Outputs map[string]Config `json:"outputs" yaml:"outputs"` Prefix string `json:"prefix" yaml:"prefix"` }
DynamicConfig contains configuration fields for the Dynamic output type.
func NewDynamicConfig ¶
func NewDynamicConfig() DynamicConfig
NewDynamicConfig creates a new DynamicConfig with default values.
type FileConfig ¶
type FileConfig struct { Path string `json:"path" yaml:"path"` Codec string `json:"codec" yaml:"codec"` }
FileConfig contains configuration fields for the file based output type.
func NewFileConfig ¶
func NewFileConfig() FileConfig
NewFileConfig creates a new FileConfig with default values.
type GCPCloudStorageConfig ¶
type GCPCloudStorageConfig struct { Bucket string `json:"bucket" yaml:"bucket"` Path string `json:"path" yaml:"path"` ContentType string `json:"content_type" yaml:"content_type"` ContentEncoding string `json:"content_encoding" yaml:"content_encoding"` ChunkSize int `json:"chunk_size" yaml:"chunk_size"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching policy.Config `json:"batching" yaml:"batching"` CollisionMode string `json:"collision_mode" yaml:"collision_mode"` }
GCPCloudStorageConfig contains configuration fields for the GCP Cloud Storage output type.
func NewGCPCloudStorageConfig ¶
func NewGCPCloudStorageConfig() GCPCloudStorageConfig
NewGCPCloudStorageConfig creates a new Config with default values.
type HTTPServer ¶
type HTTPServer struct {
// contains filtered or unexported fields
}
HTTPServer is an output type that serves HTTPServer GET requests.
func (*HTTPServer) CloseAsync ¶
func (h *HTTPServer) CloseAsync()
CloseAsync shuts down the HTTPServer output and stops processing requests.
func (*HTTPServer) Connected ¶
func (h *HTTPServer) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*HTTPServer) Consume ¶
func (h *HTTPServer) Consume(ts <-chan message.Transaction) error
Consume assigns a messages channel for the output to read.
func (*HTTPServer) WaitForClose ¶
func (h *HTTPServer) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the HTTPServer output has closed down.
type HTTPServerConfig ¶
type HTTPServerConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` StreamPath string `json:"stream_path" yaml:"stream_path"` WSPath string `json:"ws_path" yaml:"ws_path"` AllowedVerbs []string `json:"allowed_verbs" yaml:"allowed_verbs"` Timeout string `json:"timeout" yaml:"timeout"` CertFile string `json:"cert_file" yaml:"cert_file"` KeyFile string `json:"key_file" yaml:"key_file"` CORS httpdocs.ServerCORS `json:"cors" yaml:"cors"` }
HTTPServerConfig contains configuration fields for the HTTPServer output type.
func NewHTTPServerConfig ¶
func NewHTTPServerConfig() HTTPServerConfig
NewHTTPServerConfig creates a new HTTPServerConfig with default values.
type Inproc ¶
type Inproc struct {
// contains filtered or unexported fields
}
Inproc is an output type that serves Inproc messages.
func (*Inproc) CloseAsync ¶
func (i *Inproc) CloseAsync()
CloseAsync shuts down the Inproc output and stops processing messages.
func (*Inproc) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
type InprocConfig ¶
type InprocConfig string
InprocConfig contains configuration fields for the Inproc output type.
func NewInprocConfig ¶
func NewInprocConfig() InprocConfig
NewInprocConfig creates a new InprocConfig with default values.
type MongoDBConfig ¶
type MongoDBConfig struct { MongoConfig client.Config `json:",inline" yaml:",inline"` Operation string `json:"operation" yaml:"operation"` WriteConcern client.WriteConcern `json:"write_concern" yaml:"write_concern"` FilterMap string `json:"filter_map" yaml:"filter_map"` DocumentMap string `json:"document_map" yaml:"document_map"` HintMap string `json:"hint_map" yaml:"hint_map"` // DeleteEmptyValue bool `json:"delete_empty_value" yaml:"delete_empty_value"` Upsert bool `json:"upsert" yaml:"upsert"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` RetryConfig retries.Config `json:",inline" yaml:",inline"` Batching policy.Config `json:"batching" yaml:"batching"` }
MongoDBConfig contains config fields for the MongoDB output type.
func NewMongoDBConfig ¶
func NewMongoDBConfig() MongoDBConfig
NewMongoDBConfig creates a MongoDB populated with default values.
type PasswordAuthenticator ¶
type PasswordAuthenticator struct { Enabled bool `json:"enabled" yaml:"enabled"` Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` }
PasswordAuthenticator contains the fields that will be used to authenticate with the Cassandra cluster.
type PulsarConfig ¶
type PulsarConfig struct { URL string `json:"url" yaml:"url"` Topic string `json:"topic" yaml:"topic"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Key string `json:"key" yaml:"key"` OrderingKey string `json:"ordering_key" yaml:"ordering_key"` Auth auth.Config `json:"auth" yaml:"auth"` }
PulsarConfig contains configuration for the Pulsar input type.
func NewPulsarConfig ¶
func NewPulsarConfig() PulsarConfig
NewPulsarConfig creates a new PulsarConfig with default values.
type RejectConfig ¶
type RejectConfig string
RejectConfig contains configuration fields for the file based output type.
func NewRejectConfig ¶
func NewRejectConfig() RejectConfig
NewRejectConfig creates a new RejectConfig with default values.
type Resource ¶
type Resource struct {
// contains filtered or unexported fields
}
Resource is a processor that returns the result of a output resource.
func (*Resource) CloseAsync ¶
func (r *Resource) CloseAsync()
CloseAsync shuts down the output and stops processing requests.
func (*Resource) Connected ¶
Connected returns a boolean indicating whether this output is currently connected to its target.
type RetryConfig ¶
type RetryConfig struct { Output *Config `json:"output" yaml:"output"` retries.Config `json:",inline" yaml:",inline"` }
RetryConfig contains configuration values for the Retry output type.
func NewRetryConfig ¶
func NewRetryConfig() RetryConfig
NewRetryConfig creates a new RetryConfig with default values.
func (RetryConfig) MarshalJSON ¶
func (r RetryConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (RetryConfig) MarshalYAML ¶
func (r RetryConfig) MarshalYAML() (interface{}, error)
MarshalYAML prints an empty object instead of nil.
type SFTPConfig ¶
type SFTPConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` Codec string `json:"codec" yaml:"codec"` Credentials sftpSetup.Credentials `json:"credentials" yaml:"credentials"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
SFTPConfig contains configuration fields for the SFTP output type.
func NewSFTPConfig ¶
func NewSFTPConfig() SFTPConfig
NewSFTPConfig creates a new Config with default values.
type STDOUTConfig ¶
type STDOUTConfig struct {
Codec string `json:"codec" yaml:"codec"`
}
STDOUTConfig contains configuration fields for the stdout based output type.
func NewSTDOUTConfig ¶
func NewSTDOUTConfig() STDOUTConfig
NewSTDOUTConfig creates a new STDOUTConfig with default values.
type Subprocess ¶
type Subprocess struct {
// contains filtered or unexported fields
}
Subprocess executes a bloblang mapping with an empty context each time this input is read from. An interval period must be specified that determines how often a message is generated.
func (*Subprocess) CloseAsync ¶
func (s *Subprocess) CloseAsync()
CloseAsync shuts down the bloblang reader.
func (*Subprocess) ConnectWithContext ¶
func (s *Subprocess) ConnectWithContext(ctx context.Context) error
ConnectWithContext establishes a Subprocess reader.
func (*Subprocess) WaitForClose ¶
func (s *Subprocess) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the bloblang input has closed down.
func (*Subprocess) WriteWithContext ¶
WriteWithContext attempts to write message contents to a directory as files.
type SubprocessConfig ¶
type SubprocessConfig struct { Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` Codec string `json:"codec" yaml:"codec"` }
SubprocessConfig contains configuration for the Subprocess input type.
func NewSubprocessConfig ¶
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig creates a new SubprocessConfig with default values.
type SwitchConfig ¶
type SwitchConfig struct { RetryUntilSuccess bool `json:"retry_until_success" yaml:"retry_until_success"` StrictMode bool `json:"strict_mode" yaml:"strict_mode"` Cases []SwitchConfigCase `json:"cases" yaml:"cases"` }
SwitchConfig contains configuration fields for the switchOutput output type.
func NewSwitchConfig ¶
func NewSwitchConfig() SwitchConfig
NewSwitchConfig creates a new SwitchConfig with default values.
type SwitchConfigCase ¶
type SwitchConfigCase struct { Check string `json:"check" yaml:"check"` Continue bool `json:"continue" yaml:"continue"` Output Config `json:"output" yaml:"output"` }
SwitchConfigCase contains configuration fields per output of a switch type.
func NewSwitchConfigCase ¶
func NewSwitchConfigCase() SwitchConfigCase
NewSwitchConfigCase creates a new switch output config with default values.
type SyncResponseWriter ¶
type SyncResponseWriter struct{}
SyncResponseWriter is a writer implementation that adds messages to a ResultStore located in the context of the first message part of each batch. This is essentially a mechanism that returns the result of a pipeline directly back to the origin of the message.
func (SyncResponseWriter) CloseAsync ¶
func (s SyncResponseWriter) CloseAsync()
CloseAsync is a noop.
func (SyncResponseWriter) ConnectWithContext ¶
func (s SyncResponseWriter) ConnectWithContext(ctx context.Context) error
ConnectWithContext is a noop.
func (SyncResponseWriter) WaitForClose ¶
func (s SyncResponseWriter) WaitForClose(time.Duration) error
WaitForClose is a noop.
func (SyncResponseWriter) WriteWithContext ¶
WriteWithContext writes a message batch to a ResultStore located in the first message of the batch.
type TryConfig ¶
type TryConfig []Config
TryConfig contains configuration fields for the Try output type.
func NewTryConfig ¶
func NewTryConfig() TryConfig
NewTryConfig creates a new BrokerConfig with default values.
type TypeSpec ¶
type TypeSpec struct { // Async indicates whether this output benefits from sending multiple // messages asynchronously over the protocol. Async bool // Batches indicates whether this output benefits from batching of messages. Batches bool Status docs.Status Summary string Description string Categories []Category Footnotes string FieldSpecs docs.FieldSpecs Examples []docs.AnnotatedExample Version string // contains filtered or unexported fields }
TypeSpec is a constructor and a usage description for each output type.
type WithPipeline ¶
type WithPipeline struct {
// contains filtered or unexported fields
}
WithPipeline is a type that wraps both an output type and a pipeline type by routing the pipeline through the output, and implements the output.Type interface in order to act like an ordinary output.
func WrapWithPipeline ¶
func WrapWithPipeline(out output.Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)
WrapWithPipeline routes a processing pipeline directly into an output and returns a type that manages both and acts like an ordinary output.
func (*WithPipeline) CloseAsync ¶
func (i *WithPipeline) CloseAsync()
CloseAsync triggers a closure of this object but does not block.
func (*WithPipeline) Connected ¶
func (i *WithPipeline) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*WithPipeline) Consume ¶
func (i *WithPipeline) Consume(tsChan <-chan message.Transaction) error
Consume starts the type listening to a message channel from a producer.
func (*WithPipeline) WaitForClose ¶
func (i *WithPipeline) WaitForClose(timeout time.Duration) error
WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.
Source Files ¶
- amqp_0_9.go
- amqp_1.go
- async_writer.go
- aws_dynamodb.go
- aws_kinesis.go
- aws_kinesis_firehose.go
- aws_s3.go
- aws_sns.go
- aws_sqs.go
- azure_blob_storage.go
- azure_queue_storage.go
- azure_table_storage.go
- batcher.go
- cache.go
- cassandra.go
- config_broker.go
- config_dynamic.go
- config_fallback.go
- config_retry.go
- config_switch.go
- constructor.go
- drop.go
- drop_on.go
- elasticsearch.go
- file.go
- gcp_cloud_storage.go
- gcp_pubsub.go
- hdfs.go
- http_client.go
- http_server.go
- inproc.go
- kafka.go
- mongodb.go
- mqtt.go
- nanomsg.go
- nats.go
- nats_stream.go
- not_batched.go
- nsq.go
- package.go
- pulsar.go
- redis_hash.go
- redis_list.go
- redis_pubsub.go
- redis_streams.go
- reject.go
- resource.go
- sftp.go
- socket.go
- stdout.go
- subprocess.go
- sync_response.go
- websocket.go
- wrap_with_pipeline.go