Documentation ¶
Index ¶
- Constants
- Variables
- func Description(async, batches bool, content string) string
- func IterateBatchedSend(msg message.Batch, fn func(int, *message.Part) error) error
- type AMQP1Config
- type AMQPConfig
- type AMQPExchangeDeclareConfig
- type AmazonS3Config
- type AmazonSQSConfig
- type AsyncSink
- type AsyncWriter
- type AzureBlobStorageConfig
- type AzureQueueStorageConfig
- type AzureTableStorageConfig
- type BrokerConfig
- type CacheConfig
- type CassandraConfig
- type Config
- type DropConfig
- type DropOnConditions
- type DropOnConfig
- type DynamicConfig
- type DynamoDBConfig
- type ElasticsearchAuthConfig
- type ElasticsearchConfig
- type FileConfig
- type GCPCloudStorageConfig
- type GCPPubSubConfig
- type HDFSConfig
- type HTTPServerConfig
- type KafkaConfig
- type KinesisConfig
- type KinesisFirehoseConfig
- type MQTTConfig
- type MongoDBConfig
- type NATSConfig
- type NATSStreamConfig
- type NSQConfig
- type NanomsgConfig
- type OptionalAWSConfig
- type PasswordAuthenticator
- type RedisHashConfig
- type RedisListConfig
- type RedisPubSubConfig
- type RedisStreamsConfig
- type RetryConfig
- type SFTPConfig
- type SNSConfig
- type STDOUTConfig
- type SessionConfig
- type SocketConfig
- type Streamed
- type SubprocessConfig
- type SwitchConfig
- type SwitchConfigCase
- type Sync
- type TryConfig
- type WithPipeline
Constants ¶
const ( // GCPCloudStorageErrorIfExistsCollisionMode - error-if-exists. GCPCloudStorageErrorIfExistsCollisionMode = "error-if-exists" // GCPCloudStorageAppendCollisionMode - append. GCPCloudStorageAppendCollisionMode = "append" // GCPCloudStorageIgnoreCollisionMode - ignore. GCPCloudStorageIgnoreCollisionMode = "ignore" // GCPCloudStorageOverwriteCollisionMode - overwrite. GCPCloudStorageOverwriteCollisionMode = "overwrite" )
Variables ¶
var InjectTracingSpanMappingDocs = docs.FieldBloblang(
"inject_tracing_map",
"EXPERIMENTAL: A [Bloblang mapping](/docs/guides/bloblang/about) used to inject an object containing tracing propagation information into outbound messages. The specification of the injected fields will match the format used by the service wide tracer.",
`meta = meta().merge(this)`,
`root.meta.span = this`,
).AtVersion("3.45.0").Advanced()
InjectTracingSpanMappingDocs returns a field spec describing an inject tracing span mapping.
Functions ¶
func Description ¶
Description appends standard feature descriptions to an output description based on various features of the output.
func IterateBatchedSend ¶
IterateBatchedSend executes a closure fn on each message of a batch, where the closure is expected to attempt a send and return an error. If an error is returned then it is added to a batch error in order to support index specific error handling.
However, if a fatal error is returned such as a connection loss or shut down then it is returned immediately.
Types ¶
type AMQP1Config ¶
type AMQP1Config struct { URL string `json:"url" yaml:"url"` TargetAddress string `json:"target_address" yaml:"target_address"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` TLS btls.Config `json:"tls" yaml:"tls"` SASL shared.SASLConfig `json:"sasl" yaml:"sasl"` Metadata metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"` ApplicationPropertiesMapping string `json:"application_properties_map" yaml:"application_properties_map"` }
AMQP1Config contains configuration fields for the AMQP1 output type.
func NewAMQP1Config ¶
func NewAMQP1Config() AMQP1Config
NewAMQP1Config creates a new AMQP1Config with default values.
type AMQPConfig ¶
type AMQPConfig struct { URLs []string `json:"urls" yaml:"urls"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Exchange string `json:"exchange" yaml:"exchange"` ExchangeDeclare AMQPExchangeDeclareConfig `json:"exchange_declare" yaml:"exchange_declare"` BindingKey string `json:"key" yaml:"key"` Type string `json:"type" yaml:"type"` ContentType string `json:"content_type" yaml:"content_type"` ContentEncoding string `json:"content_encoding" yaml:"content_encoding"` Metadata metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"` Priority string `json:"priority" yaml:"priority"` Persistent bool `json:"persistent" yaml:"persistent"` Mandatory bool `json:"mandatory" yaml:"mandatory"` Immediate bool `json:"immediate" yaml:"immediate"` TLS btls.Config `json:"tls" yaml:"tls"` Timeout string `json:"timeout" yaml:"timeout"` }
AMQPConfig contains configuration fields for the AMQP output type.
func NewAMQPConfig ¶
func NewAMQPConfig() AMQPConfig
NewAMQPConfig creates a new AMQPConfig with default values.
type AMQPExchangeDeclareConfig ¶
type AMQPExchangeDeclareConfig struct { Enabled bool `json:"enabled" yaml:"enabled"` Type string `json:"type" yaml:"type"` Durable bool `json:"durable" yaml:"durable"` }
AMQPExchangeDeclareConfig contains fields indicating whether the target AMQP exchange needs to be declared, as well as any fields specifying how to accomplish that.
type AmazonS3Config ¶
type AmazonS3Config struct { sess.Config `json:",inline" yaml:",inline"` Bucket string `json:"bucket" yaml:"bucket"` ForcePathStyleURLs bool `json:"force_path_style_urls" yaml:"force_path_style_urls"` Path string `json:"path" yaml:"path"` Tags map[string]string `json:"tags" yaml:"tags"` ContentType string `json:"content_type" yaml:"content_type"` ContentEncoding string `json:"content_encoding" yaml:"content_encoding"` CacheControl string `json:"cache_control" yaml:"cache_control"` ContentDisposition string `json:"content_disposition" yaml:"content_disposition"` ContentLanguage string `json:"content_language" yaml:"content_language"` WebsiteRedirectLocation string `json:"website_redirect_location" yaml:"website_redirect_location"` Metadata metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"` StorageClass string `json:"storage_class" yaml:"storage_class"` Timeout string `json:"timeout" yaml:"timeout"` KMSKeyID string `json:"kms_key_id" yaml:"kms_key_id"` ServerSideEncryption string `json:"server_side_encryption" yaml:"server_side_encryption"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
AmazonS3Config contains configuration fields for the AmazonS3 output type.
func NewAmazonS3Config ¶
func NewAmazonS3Config() AmazonS3Config
NewAmazonS3Config creates a new Config with default values.
type AmazonSQSConfig ¶
type AmazonSQSConfig struct { SessionConfig `json:",inline" yaml:",inline"` URL string `json:"url" yaml:"url"` MessageGroupID string `json:"message_group_id" yaml:"message_group_id"` MessageDeduplicationID string `json:"message_deduplication_id" yaml:"message_deduplication_id"` Metadata metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` retries.Config `json:",inline" yaml:",inline"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
AmazonSQSConfig contains configuration fields for the output AmazonSQS type.
func NewAmazonSQSConfig ¶
func NewAmazonSQSConfig() AmazonSQSConfig
NewAmazonSQSConfig creates a new Config with default values.
type AsyncSink ¶
type AsyncSink interface { // Connect attempts to establish a connection to the sink, if // unsuccessful returns an error. If the attempt is successful (or not // necessary) returns nil. Connect(ctx context.Context) error // WriteBatch should block until either the message is sent (and // acknowledged) to a sink, or a transport specific error has occurred, or // the Type is closed. WriteBatch(ctx context.Context, msg message.Batch) error // Close is a blocking call to wait until the component has finished // shutting down and cleaning up resources. Close(ctx context.Context) error }
AsyncSink is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.
type AsyncWriter ¶
type AsyncWriter struct {
// contains filtered or unexported fields
}
AsyncWriter is an output type that writes messages to a writer.Type.
func (*AsyncWriter) Connected ¶
func (w *AsyncWriter) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*AsyncWriter) Consume ¶
func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error
Consume assigns a messages channel for the output to read.
func (*AsyncWriter) SetInjectTracingMap ¶
func (w *AsyncWriter) SetInjectTracingMap(exec *mapping.Executor)
SetInjectTracingMap sets a mapping to be used for injecting tracing events into messages.
func (*AsyncWriter) TriggerCloseNow ¶
func (w *AsyncWriter) TriggerCloseNow()
TriggerCloseNow shuts down the output and stops processing messages.
func (*AsyncWriter) WaitForClose ¶
func (w *AsyncWriter) WaitForClose(ctx context.Context) error
WaitForClose blocks until the File output has closed down.
type AzureBlobStorageConfig ¶
type AzureBlobStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageSASToken string `json:"storage_sas_token" yaml:"storage_sas_token"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` Container string `json:"container" yaml:"container"` Path string `json:"path" yaml:"path"` BlobType string `json:"blob_type" yaml:"blob_type"` PublicAccessLevel string `json:"public_access_level" yaml:"public_access_level"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
AzureBlobStorageConfig contains configuration fields for the AzureBlobStorage output type.
func NewAzureBlobStorageConfig ¶
func NewAzureBlobStorageConfig() AzureBlobStorageConfig
NewAzureBlobStorageConfig creates a new Config with default values.
type AzureQueueStorageConfig ¶
type AzureQueueStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` QueueName string `json:"queue_name" yaml:"queue_name"` TTL string `json:"ttl" yaml:"ttl"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
AzureQueueStorageConfig contains configuration fields for the output Azure Queue Storage type.
func NewAzureQueueStorageConfig ¶
func NewAzureQueueStorageConfig() AzureQueueStorageConfig
NewAzureQueueStorageConfig creates a new Config with default values.
type AzureTableStorageConfig ¶
type AzureTableStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` TableName string `json:"table_name" yaml:"table_name"` PartitionKey string `json:"partition_key" yaml:"partition_key"` RowKey string `json:"row_key" yaml:"row_key"` Properties map[string]string `json:"properties" yaml:"properties"` InsertType string `json:"insert_type" yaml:"insert_type"` TransactionType string `json:"transaction_type" yaml:"transaction_type"` Timeout string `json:"timeout" yaml:"timeout"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
AzureTableStorageConfig contains configuration fields for the AzureTableStorage output type.
func NewAzureTableStorageConfig ¶
func NewAzureTableStorageConfig() AzureTableStorageConfig
NewAzureTableStorageConfig creates a new Config with default values.
type BrokerConfig ¶
type BrokerConfig struct { Copies int `json:"copies" yaml:"copies"` Pattern string `json:"pattern" yaml:"pattern"` Outputs []Config `json:"outputs" yaml:"outputs"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
BrokerConfig contains configuration fields for the Broker output type.
func NewBrokerConfig ¶
func NewBrokerConfig() BrokerConfig
NewBrokerConfig creates a new BrokerConfig with default values.
type CacheConfig ¶
type CacheConfig struct { Target string `json:"target" yaml:"target"` Key string `json:"key" yaml:"key"` TTL string `json:"ttl" yaml:"ttl"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
CacheConfig contains configuration fields for the Cache output type.
func NewCacheConfig ¶
func NewCacheConfig() CacheConfig
NewCacheConfig creates a new Config with default values.
type CassandraConfig ¶
type CassandraConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` TLS btls.Config `json:"tls" yaml:"tls"` PasswordAuthenticator PasswordAuthenticator `json:"password_authenticator" yaml:"password_authenticator"` DisableInitialHostLookup bool `json:"disable_initial_host_lookup" yaml:"disable_initial_host_lookup"` Query string `json:"query" yaml:"query"` ArgsMapping string `json:"args_mapping" yaml:"args_mapping"` Consistency string `json:"consistency" yaml:"consistency"` Timeout string `json:"timeout" yaml:"timeout"` LoggedBatch bool `json:"logged_batch" yaml:"logged_batch"` // TODO: V4 Remove this and replace with explicit values. retries.Config `json:",inline" yaml:",inline"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
CassandraConfig contains configuration fields for the Cassandra output type.
func NewCassandraConfig ¶
func NewCassandraConfig() CassandraConfig
NewCassandraConfig creates a new CassandraConfig with default values.
type Config ¶
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` AMQP09 AMQPConfig `json:"amqp_0_9" yaml:"amqp_0_9"` AMQP1 AMQP1Config `json:"amqp_1" yaml:"amqp_1"` AWSDynamoDB DynamoDBConfig `json:"aws_dynamodb" yaml:"aws_dynamodb"` AWSKinesis KinesisConfig `json:"aws_kinesis" yaml:"aws_kinesis"` AWSKinesisFirehose KinesisFirehoseConfig `json:"aws_kinesis_firehose" yaml:"aws_kinesis_firehose"` AWSS3 AmazonS3Config `json:"aws_s3" yaml:"aws_s3"` AWSSNS SNSConfig `json:"aws_sns" yaml:"aws_sns"` AWSSQS AmazonSQSConfig `json:"aws_sqs" yaml:"aws_sqs"` AzureBlobStorage AzureBlobStorageConfig `json:"azure_blob_storage" yaml:"azure_blob_storage"` AzureQueueStorage AzureQueueStorageConfig `json:"azure_queue_storage" yaml:"azure_queue_storage"` AzureTableStorage AzureTableStorageConfig `json:"azure_table_storage" yaml:"azure_table_storage"` Broker BrokerConfig `json:"broker" yaml:"broker"` Cache CacheConfig `json:"cache" yaml:"cache"` Cassandra CassandraConfig `json:"cassandra" yaml:"cassandra"` Drop DropConfig `json:"drop" yaml:"drop"` DropOn DropOnConfig `json:"drop_on" yaml:"drop_on"` Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"` Elasticsearch ElasticsearchConfig `json:"elasticsearch" yaml:"elasticsearch"` Fallback TryConfig `json:"fallback" yaml:"fallback"` File FileConfig `json:"file" yaml:"file"` GCPCloudStorage GCPCloudStorageConfig `json:"gcp_cloud_storage" yaml:"gcp_cloud_storage"` GCPPubSub GCPPubSubConfig `json:"gcp_pubsub" yaml:"gcp_pubsub"` HDFS HDFSConfig `json:"hdfs" yaml:"hdfs"` HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"` Inproc string `json:"inproc" yaml:"inproc"` Kafka KafkaConfig `json:"kafka" yaml:"kafka"` MongoDB MongoDBConfig `json:"mongodb" yaml:"mongodb"` MQTT MQTTConfig `json:"mqtt" yaml:"mqtt"` Nanomsg NanomsgConfig `json:"nanomsg" yaml:"nanomsg"` NATS NATSConfig `json:"nats" yaml:"nats"` NATSStream NATSStreamConfig `json:"nats_stream" yaml:"nats_stream"` NSQ NSQConfig `json:"nsq" yaml:"nsq"` Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"` RedisHash RedisHashConfig `json:"redis_hash" yaml:"redis_hash"` RedisList RedisListConfig `json:"redis_list" yaml:"redis_list"` RedisPubSub RedisPubSubConfig `json:"redis_pubsub" yaml:"redis_pubsub"` RedisStreams RedisStreamsConfig `json:"redis_streams" yaml:"redis_streams"` Reject string `json:"reject" yaml:"reject"` Resource string `json:"resource" yaml:"resource"` Retry RetryConfig `json:"retry" yaml:"retry"` SFTP SFTPConfig `json:"sftp" yaml:"sftp"` STDOUT STDOUTConfig `json:"stdout" yaml:"stdout"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Switch SwitchConfig `json:"switch" yaml:"switch"` SyncResponse struct{} `json:"sync_response" yaml:"sync_response"` Socket SocketConfig `json:"socket" yaml:"socket"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is the all encompassing configuration struct for all output types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
func NewConfig ¶
func NewConfig() Config
NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
func (*Config) UnmarshalYAML ¶
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type DropConfig ¶
type DropConfig struct{}
DropConfig contains configuration fields for the drop output type.
func NewDropConfig ¶
func NewDropConfig() DropConfig
NewDropConfig creates a new DropConfig with default values.
type DropOnConditions ¶
type DropOnConditions struct { Error bool `json:"error" yaml:"error"` BackPressure string `json:"back_pressure" yaml:"back_pressure"` }
DropOnConditions is a config struct representing the different circumstances under which messages should be dropped.
type DropOnConfig ¶
type DropOnConfig struct { DropOnConditions `json:",inline" yaml:",inline"` Output *Config `json:"output" yaml:"output"` }
DropOnConfig contains configuration values for the DropOn output type.
func NewDropOnConfig ¶
func NewDropOnConfig() DropOnConfig
NewDropOnConfig creates a new DropOnConfig with default values.
func (DropOnConfig) MarshalJSON ¶
func (d DropOnConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (DropOnConfig) MarshalYAML ¶
func (d DropOnConfig) MarshalYAML() (any, error)
MarshalYAML prints an empty object instead of nil.
type DynamicConfig ¶
type DynamicConfig struct { Outputs map[string]Config `json:"outputs" yaml:"outputs"` Prefix string `json:"prefix" yaml:"prefix"` }
DynamicConfig contains configuration fields for the Dynamic output type.
func NewDynamicConfig ¶
func NewDynamicConfig() DynamicConfig
NewDynamicConfig creates a new DynamicConfig with default values.
type DynamoDBConfig ¶
type DynamoDBConfig struct { SessionConfig `json:",inline" yaml:",inline"` Table string `json:"table" yaml:"table"` StringColumns map[string]string `json:"string_columns" yaml:"string_columns"` JSONMapColumns map[string]string `json:"json_map_columns" yaml:"json_map_columns"` TTL string `json:"ttl" yaml:"ttl"` TTLKey string `json:"ttl_key" yaml:"ttl_key"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` retries.Config `json:",inline" yaml:",inline"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
DynamoDBConfig contains config fields for the DynamoDB output type.
func NewDynamoDBConfig ¶
func NewDynamoDBConfig() DynamoDBConfig
NewDynamoDBConfig creates a DynamoDBConfig populated with default values.
type ElasticsearchAuthConfig ¶
type ElasticsearchAuthConfig struct { Enabled bool `json:"enabled" yaml:"enabled"` Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` }
ElasticsearchAuthConfig contains basic authentication fields.
type ElasticsearchConfig ¶
type ElasticsearchConfig struct { URLs []string `json:"urls" yaml:"urls"` Sniff bool `json:"sniff" yaml:"sniff"` Healthcheck bool `json:"healthcheck" yaml:"healthcheck"` ID string `json:"id" yaml:"id"` Action string `json:"action" yaml:"action"` Index string `json:"index" yaml:"index"` Pipeline string `json:"pipeline" yaml:"pipeline"` Routing string `json:"routing" yaml:"routing"` Type string `json:"type" yaml:"type"` Timeout string `json:"timeout" yaml:"timeout"` TLS btls.Config `json:"tls" yaml:"tls"` Auth ElasticsearchAuthConfig `json:"basic_auth" yaml:"basic_auth"` AWS OptionalAWSConfig `json:"aws" yaml:"aws"` GzipCompression bool `json:"gzip_compression" yaml:"gzip_compression"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` retries.Config `json:",inline" yaml:",inline"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
ElasticsearchConfig contains configuration fields for the Elasticsearch output type.
func NewElasticsearchConfig ¶
func NewElasticsearchConfig() ElasticsearchConfig
NewElasticsearchConfig creates a new ElasticsearchConfig with default values.
type FileConfig ¶
type FileConfig struct { Path string `json:"path" yaml:"path"` Codec string `json:"codec" yaml:"codec"` }
FileConfig contains configuration fields for the file based output type.
func NewFileConfig ¶
func NewFileConfig() FileConfig
NewFileConfig creates a new FileConfig with default values.
type GCPCloudStorageConfig ¶
type GCPCloudStorageConfig struct { Bucket string `json:"bucket" yaml:"bucket"` Path string `json:"path" yaml:"path"` ContentType string `json:"content_type" yaml:"content_type"` ContentEncoding string `json:"content_encoding" yaml:"content_encoding"` ChunkSize int `json:"chunk_size" yaml:"chunk_size"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batchconfig.Config `json:"batching" yaml:"batching"` CollisionMode string `json:"collision_mode" yaml:"collision_mode"` }
GCPCloudStorageConfig contains configuration fields for the GCP Cloud Storage output type.
func NewGCPCloudStorageConfig ¶
func NewGCPCloudStorageConfig() GCPCloudStorageConfig
NewGCPCloudStorageConfig creates a new Config with default values.
type GCPPubSubConfig ¶
type GCPPubSubConfig struct { ProjectID string `json:"project" yaml:"project"` TopicID string `json:"topic" yaml:"topic"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` PublishTimeout string `json:"publish_timeout" yaml:"publish_timeout"` Metadata metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"` OrderingKey string `json:"ordering_key" yaml:"ordering_key"` Endpoint string `json:"endpoint" yaml:"endpoint"` }
GCPPubSubConfig contains configuration fields for the output GCPPubSub type.
func NewGCPPubSubConfig ¶
func NewGCPPubSubConfig() GCPPubSubConfig
NewGCPPubSubConfig creates a new Config with default values.
type HDFSConfig ¶
type HDFSConfig struct { Hosts []string `json:"hosts" yaml:"hosts"` User string `json:"user" yaml:"user"` Directory string `json:"directory" yaml:"directory"` Path string `json:"path" yaml:"path"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
HDFSConfig contains configuration fields for the HDFS output type.
func NewHDFSConfig ¶
func NewHDFSConfig() HDFSConfig
NewHDFSConfig creates a new Config with default values.
type HTTPServerConfig ¶
type HTTPServerConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` StreamPath string `json:"stream_path" yaml:"stream_path"` WSPath string `json:"ws_path" yaml:"ws_path"` AllowedVerbs []string `json:"allowed_verbs" yaml:"allowed_verbs"` Timeout string `json:"timeout" yaml:"timeout"` CertFile string `json:"cert_file" yaml:"cert_file"` KeyFile string `json:"key_file" yaml:"key_file"` CORS httpserver.CORSConfig `json:"cors" yaml:"cors"` }
HTTPServerConfig contains configuration fields for the HTTPServer output type.
func NewHTTPServerConfig ¶
func NewHTTPServerConfig() HTTPServerConfig
NewHTTPServerConfig creates a new HTTPServerConfig with default values.
type KafkaConfig ¶
type KafkaConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` ClientID string `json:"client_id" yaml:"client_id"` RackID string `json:"rack_id" yaml:"rack_id"` Key string `json:"key" yaml:"key"` Partitioner string `json:"partitioner" yaml:"partitioner"` Partition string `json:"partition" yaml:"partition"` Topic string `json:"topic" yaml:"topic"` Compression string `json:"compression" yaml:"compression"` MaxMsgBytes int `json:"max_msg_bytes" yaml:"max_msg_bytes"` Timeout string `json:"timeout" yaml:"timeout"` AckReplicas bool `json:"ack_replicas" yaml:"ack_replicas"` TargetVersion string `json:"target_version" yaml:"target_version"` TLS btls.Config `json:"tls" yaml:"tls"` SASL sasl.Config `json:"sasl" yaml:"sasl"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` retries.Config `json:",inline" yaml:",inline"` RetryAsBatch bool `json:"retry_as_batch" yaml:"retry_as_batch"` Batching batchconfig.Config `json:"batching" yaml:"batching"` StaticHeaders map[string]string `json:"static_headers" yaml:"static_headers"` Metadata metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"` InjectTracingMap string `json:"inject_tracing_map" yaml:"inject_tracing_map"` }
KafkaConfig contains configuration fields for the Kafka output type.
func NewKafkaConfig ¶
func NewKafkaConfig() KafkaConfig
NewKafkaConfig creates a new KafkaConfig with default values.
type KinesisConfig ¶
type KinesisConfig struct { SessionConfig `json:",inline" yaml:",inline"` Stream string `json:"stream" yaml:"stream"` HashKey string `json:"hash_key" yaml:"hash_key"` PartitionKey string `json:"partition_key" yaml:"partition_key"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` retries.Config `json:",inline" yaml:",inline"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
KinesisConfig contains configuration fields for the Kinesis output type.
func NewKinesisConfig ¶
func NewKinesisConfig() KinesisConfig
NewKinesisConfig creates a new Config with default values.
type KinesisFirehoseConfig ¶
type KinesisFirehoseConfig struct { SessionConfig `json:",inline" yaml:",inline"` Stream string `json:"stream" yaml:"stream"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` retries.Config `json:",inline" yaml:",inline"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
KinesisFirehoseConfig contains configuration fields for the KinesisFirehose output type.
func NewKinesisFirehoseConfig ¶
func NewKinesisFirehoseConfig() KinesisFirehoseConfig
NewKinesisFirehoseConfig creates a new Config with default values.
type MQTTConfig ¶
type MQTTConfig struct { URLs []string `json:"urls" yaml:"urls"` QoS uint8 `json:"qos" yaml:"qos"` Retained bool `json:"retained" yaml:"retained"` RetainedInterpolated string `json:"retained_interpolated" yaml:"retained_interpolated"` Topic string `json:"topic" yaml:"topic"` ClientID string `json:"client_id" yaml:"client_id"` DynamicClientIDSuffix string `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"` Will mqttconf.Will `json:"will" yaml:"will"` User string `json:"user" yaml:"user"` Password string `json:"password" yaml:"password"` ConnectTimeout string `json:"connect_timeout" yaml:"connect_timeout"` WriteTimeout string `json:"write_timeout" yaml:"write_timeout"` KeepAlive int64 `json:"keepalive" yaml:"keepalive"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` TLS tls.Config `json:"tls" yaml:"tls"` }
MQTTConfig contains configuration fields for the MQTT output type.
func NewMQTTConfig ¶
func NewMQTTConfig() MQTTConfig
NewMQTTConfig creates a new MQTTConfig with default values.
type MongoDBConfig ¶
type MongoDBConfig struct { MongoConfig client.Config `json:",inline" yaml:",inline"` Operation string `json:"operation" yaml:"operation"` WriteConcern client.WriteConcern `json:"write_concern" yaml:"write_concern"` FilterMap string `json:"filter_map" yaml:"filter_map"` DocumentMap string `json:"document_map" yaml:"document_map"` HintMap string `json:"hint_map" yaml:"hint_map"` // DeleteEmptyValue bool `json:"delete_empty_value" yaml:"delete_empty_value"` Upsert bool `json:"upsert" yaml:"upsert"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` RetryConfig retries.Config `json:",inline" yaml:",inline"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
MongoDBConfig contains config fields for the MongoDB output type.
func NewMongoDBConfig ¶
func NewMongoDBConfig() MongoDBConfig
NewMongoDBConfig creates a MongoDB populated with default values.
type NATSConfig ¶
type NATSConfig struct { URLs []string `json:"urls" yaml:"urls"` Subject string `json:"subject" yaml:"subject"` Headers map[string]string `json:"headers" yaml:"headers"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` TLS btls.Config `json:"tls" yaml:"tls"` Auth auth.Config `json:"auth" yaml:"auth"` }
NATSConfig contains configuration fields for the NATS output type.
func NewNATSConfig ¶
func NewNATSConfig() NATSConfig
NewNATSConfig creates a new NATSConfig with default values.
type NATSStreamConfig ¶
type NATSStreamConfig struct { URLs []string `json:"urls" yaml:"urls"` ClusterID string `json:"cluster_id" yaml:"cluster_id"` ClientID string `json:"client_id" yaml:"client_id"` Subject string `json:"subject" yaml:"subject"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` TLS btls.Config `json:"tls" yaml:"tls"` Auth auth.Config `json:"auth" yaml:"auth"` }
NATSStreamConfig contains configuration fields for the NATSStream output type.
func NewNATSStreamConfig ¶
func NewNATSStreamConfig() NATSStreamConfig
NewNATSStreamConfig creates a new NATSStreamConfig with default values.
type NSQConfig ¶
type NSQConfig struct { Address string `json:"nsqd_tcp_address" yaml:"nsqd_tcp_address"` Topic string `json:"topic" yaml:"topic"` UserAgent string `json:"user_agent" yaml:"user_agent"` TLS btls.Config `json:"tls" yaml:"tls"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
NSQConfig contains configuration fields for the NSQ output type.
func NewNSQConfig ¶
func NewNSQConfig() NSQConfig
NewNSQConfig creates a new NSQConfig with default values.
type NanomsgConfig ¶
type NanomsgConfig struct { URLs []string `json:"urls" yaml:"urls"` Bind bool `json:"bind" yaml:"bind"` SocketType string `json:"socket_type" yaml:"socket_type"` PollTimeout string `json:"poll_timeout" yaml:"poll_timeout"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
NanomsgConfig contains configuration fields for the Nanomsg output type.
func NewNanomsgConfig ¶
func NewNanomsgConfig() NanomsgConfig
NewNanomsgConfig creates a new NanomsgConfig with default values.
type OptionalAWSConfig ¶
type OptionalAWSConfig struct { Enabled bool `json:"enabled" yaml:"enabled"` sess.Config `json:",inline" yaml:",inline"` }
OptionalAWSConfig contains config fields for AWS authentication with an enable flag.
type PasswordAuthenticator ¶
type PasswordAuthenticator struct { Enabled bool `json:"enabled" yaml:"enabled"` Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` }
PasswordAuthenticator contains the fields that will be used to authenticate with the Cassandra cluster.
type RedisHashConfig ¶
type RedisHashConfig struct { bredis.Config `json:",inline" yaml:",inline"` Key string `json:"key" yaml:"key"` WalkMetadata bool `json:"walk_metadata" yaml:"walk_metadata"` WalkJSONObject bool `json:"walk_json_object" yaml:"walk_json_object"` Fields map[string]string `json:"fields" yaml:"fields"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
RedisHashConfig contains configuration fields for the RedisHash output type.
func NewRedisHashConfig ¶
func NewRedisHashConfig() RedisHashConfig
NewRedisHashConfig creates a new RedisHashConfig with default values.
type RedisListConfig ¶
type RedisListConfig struct { bredis.Config `json:",inline" yaml:",inline"` Key string `json:"key" yaml:"key"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
RedisListConfig contains configuration fields for the RedisList output type.
func NewRedisListConfig ¶
func NewRedisListConfig() RedisListConfig
NewRedisListConfig creates a new RedisListConfig with default values.
type RedisPubSubConfig ¶
type RedisPubSubConfig struct { bredis.Config `json:",inline" yaml:",inline"` Channel string `json:"channel" yaml:"channel"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
RedisPubSubConfig contains configuration fields for the RedisPubSub output type.
func NewRedisPubSubConfig ¶
func NewRedisPubSubConfig() RedisPubSubConfig
NewRedisPubSubConfig creates a new RedisPubSubConfig with default values.
type RedisStreamsConfig ¶
type RedisStreamsConfig struct { bredis.Config `json:",inline" yaml:",inline"` Stream string `json:"stream" yaml:"stream"` BodyKey string `json:"body_key" yaml:"body_key"` MaxLenApprox int64 `json:"max_length" yaml:"max_length"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Metadata metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
RedisStreamsConfig contains configuration fields for the RedisStreams output type.
func NewRedisStreamsConfig ¶
func NewRedisStreamsConfig() RedisStreamsConfig
NewRedisStreamsConfig creates a new RedisStreamsConfig with default values.
type RetryConfig ¶
type RetryConfig struct { Output *Config `json:"output" yaml:"output"` retries.Config `json:",inline" yaml:",inline"` }
RetryConfig contains configuration values for the Retry output type.
func NewRetryConfig ¶
func NewRetryConfig() RetryConfig
NewRetryConfig creates a new RetryConfig with default values.
func (RetryConfig) MarshalJSON ¶
func (r RetryConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (RetryConfig) MarshalYAML ¶
func (r RetryConfig) MarshalYAML() (any, error)
MarshalYAML prints an empty object instead of nil.
type SFTPConfig ¶
type SFTPConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` Codec string `json:"codec" yaml:"codec"` Credentials sftpSetup.Credentials `json:"credentials" yaml:"credentials"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
SFTPConfig contains configuration fields for the SFTP output type.
func NewSFTPConfig ¶
func NewSFTPConfig() SFTPConfig
NewSFTPConfig creates a new Config with default values.
type SNSConfig ¶
type SNSConfig struct { TopicArn string `json:"topic_arn" yaml:"topic_arn"` MessageGroupID string `json:"message_group_id" yaml:"message_group_id"` MessageDeduplicationID string `json:"message_deduplication_id" yaml:"message_deduplication_id"` Metadata metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"` SessionConfig `json:",inline" yaml:",inline"` Timeout string `json:"timeout" yaml:"timeout"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
SNSConfig contains configuration fields for the output SNS type.
func NewSNSConfig ¶
func NewSNSConfig() SNSConfig
NewSNSConfig creates a new Config with default values.
type STDOUTConfig ¶
type STDOUTConfig struct {
Codec string `json:"codec" yaml:"codec"`
}
STDOUTConfig contains configuration fields for the stdout based output type.
func NewSTDOUTConfig ¶
func NewSTDOUTConfig() STDOUTConfig
NewSTDOUTConfig creates a new STDOUTConfig with default values.
type SessionConfig ¶
SessionConfig hides a general AWS session config struct.
type SocketConfig ¶
type SocketConfig struct { Network string `json:"network" yaml:"network"` Address string `json:"address" yaml:"address"` Codec string `json:"codec" yaml:"codec"` }
SocketConfig contains configuration fields for the Socket output type.
func NewSocketConfig ¶
func NewSocketConfig() SocketConfig
NewSocketConfig creates a new SocketConfig with default values.
type Streamed ¶
type Streamed interface { // Consume starts the type receiving transactions from a Transactor. Consume(<-chan message.Transaction) error // Connected returns a boolean indicating whether this output is currently // connected to its target. Connected() bool // TriggerCloseNow triggers the shut down of this component but should not // block the calling goroutine. TriggerCloseNow() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(ctx context.Context) error }
Streamed is a common interface implemented by outputs and provides channel based streaming APIs.
func NewAsyncWriter ¶
func NewAsyncWriter(typeStr string, maxInflight int, w AsyncSink, mgr component.Observability) (Streamed, error)
NewAsyncWriter creates a Streamed implementation around an AsyncSink.
func OnlySinglePayloads ¶
OnlySinglePayloads expands message batches into individual payloads, respecting the max in flight of the wrapped output. This is a more efficient way of feeding messages into an output that handles its own batching mechanism internally, or does not support batching at all.
func WrapWithPipelines ¶
func WrapWithPipelines(out Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)
WrapWithPipelines wraps an output with a variadic number of pipelines.
type SubprocessConfig ¶
type SubprocessConfig struct { Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` Codec string `json:"codec" yaml:"codec"` }
SubprocessConfig contains configuration for the Subprocess input type.
func NewSubprocessConfig ¶
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig creates a new SubprocessConfig with default values.
type SwitchConfig ¶
type SwitchConfig struct { RetryUntilSuccess bool `json:"retry_until_success" yaml:"retry_until_success"` StrictMode bool `json:"strict_mode" yaml:"strict_mode"` Cases []SwitchConfigCase `json:"cases" yaml:"cases"` }
SwitchConfig contains configuration fields for the switchOutput output type.
func NewSwitchConfig ¶
func NewSwitchConfig() SwitchConfig
NewSwitchConfig creates a new SwitchConfig with default values.
type SwitchConfigCase ¶
type SwitchConfigCase struct { Check string `json:"check" yaml:"check"` Continue bool `json:"continue" yaml:"continue"` Output Config `json:"output" yaml:"output"` }
SwitchConfigCase contains configuration fields per output of a switch type.
func NewSwitchConfigCase ¶
func NewSwitchConfigCase() SwitchConfigCase
NewSwitchConfigCase creates a new switch output config with default values.
type Sync ¶
type Sync interface { // WriteTransaction attempts to write a transaction to an output. WriteTransaction(context.Context, message.Transaction) error // Connected returns a boolean indicating whether this output is currently // connected to its target. Connected() bool // TriggerStopConsuming instructs the output to start shutting down // resources once all pending messages are delivered and acknowledged. TriggerStopConsuming() // TriggerCloseNow triggers the shut down of this component but should not // block the calling goroutine. TriggerCloseNow() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(ctx context.Context) error }
Sync is a common interface implemented by outputs and provides synchronous based writing APIs.
type TryConfig ¶
type TryConfig []Config
TryConfig contains configuration fields for the Try output type.
func NewTryConfig ¶
func NewTryConfig() TryConfig
NewTryConfig creates a new BrokerConfig with default values.
type WithPipeline ¶
type WithPipeline struct {
// contains filtered or unexported fields
}
WithPipeline is a type that wraps both an output type and a pipeline type by routing the pipeline through the output, and implements the output.Type interface in order to act like an ordinary output.
func WrapWithPipeline ¶
func WrapWithPipeline(out Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)
WrapWithPipeline routes a processing pipeline directly into an output and returns a type that manages both and acts like an ordinary output.
func (*WithPipeline) Connected ¶
func (i *WithPipeline) Connected() bool
Connected returns a boolean indicating whether this output is currently connected to its target.
func (*WithPipeline) Consume ¶
func (i *WithPipeline) Consume(tsChan <-chan message.Transaction) error
Consume starts the type listening to a message channel from a producer.
func (*WithPipeline) TriggerCloseNow ¶
func (i *WithPipeline) TriggerCloseNow()
TriggerCloseNow triggers a closure of this object but does not block.
func (*WithPipeline) WaitForClose ¶
func (i *WithPipeline) WaitForClose(ctx context.Context) error
WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.
Source Files ¶
- async_writer.go
- batched_send.go
- config.go
- config_amqp_0_9.go
- config_amqp_1.go
- config_aws_dynamodb.go
- config_aws_kinesis.go
- config_aws_kinesis_firehose.go
- config_aws_s3.go
- config_aws_sns.go
- config_aws_sqs.go
- config_azure_blob_storage.go
- config_azure_queue_storage.go
- config_azure_table_storage.go
- config_broker.go
- config_cache.go
- config_cassandra.go
- config_drop.go
- config_drop_on.go
- config_dynamic.go
- config_elasticsearch.go
- config_fallback.go
- config_file.go
- config_gcp_cloud_storage.go
- config_gcp_pubsub.go
- config_hdfs.go
- config_http_server.go
- config_kafka.go
- config_mongodb.go
- config_mqtt.go
- config_nanomsg.go
- config_nats.go
- config_nats_stream.go
- config_nsq.go
- config_redis_hash.go
- config_redis_list.go
- config_redis_pubsub.go
- config_redis_streams.go
- config_retry.go
- config_sftp.go
- config_socket.go
- config_stdout.go
- config_subprocess.go
- config_switch.go
- docs.go
- interface.go
- not_batched.go
- wrap_with_pipeline.go