Documentation ¶
Overview ¶
Package writer defines implementations of an interface for generic message writing that outputs to various third party sinks.
Index ¶
- func IterateBatchedSend(msg types.Message, fn func(int, types.Part) error) error
- type AMQP
- func (a *AMQP) CloseAsync()
- func (a *AMQP) Connect() error
- func (a *AMQP) ConnectWithContext(ctx context.Context) error
- func (a *AMQP) WaitForClose(timeout time.Duration) error
- func (a *AMQP) Write(msg types.Message) error
- func (a *AMQP) WriteWithContext(ctx context.Context, msg types.Message) error
- type AMQP1
- func (a *AMQP1) CloseAsync()
- func (a *AMQP1) Connect() error
- func (a *AMQP1) ConnectWithContext(ctx context.Context) error
- func (a *AMQP1) WaitForClose(timeout time.Duration) error
- func (a *AMQP1) Write(msg types.Message) error
- func (a *AMQP1) WriteWithContext(ctx context.Context, msg types.Message) error
- type AMQP1Config
- type AMQPConfig
- type AMQPExchangeDeclareConfig
- type AmazonS3
- func (a *AmazonS3) CloseAsync()
- func (a *AmazonS3) Connect() error
- func (a *AmazonS3) ConnectWithContext(ctx context.Context) error
- func (a *AmazonS3) WaitForClose(time.Duration) error
- func (a *AmazonS3) Write(msg types.Message) error
- func (a *AmazonS3) WriteWithContext(wctx context.Context, msg types.Message) error
- type AmazonS3Config
- type AmazonSQS
- func (a *AmazonSQS) CloseAsync()
- func (a *AmazonSQS) Connect() error
- func (a *AmazonSQS) ConnectWithContext(ctx context.Context) error
- func (a *AmazonSQS) WaitForClose(time.Duration) error
- func (a *AmazonSQS) Write(msg types.Message) error
- func (a *AmazonSQS) WriteWithContext(ctx context.Context, msg types.Message) error
- type AmazonSQSConfig
- type AzureBlobStorage
- func (a *AzureBlobStorage) CloseAsync()
- func (a *AzureBlobStorage) Connect() error
- func (a *AzureBlobStorage) ConnectWithContext(ctx context.Context) error
- func (a *AzureBlobStorage) WaitForClose(time.Duration) error
- func (a *AzureBlobStorage) Write(msg types.Message) error
- func (a *AzureBlobStorage) WriteWithContext(_ context.Context, msg types.Message) error
- type AzureBlobStorageConfig
- type AzureQueueStorage
- func (a *AzureQueueStorage) CloseAsync()
- func (a *AzureQueueStorage) Connect() error
- func (a *AzureQueueStorage) ConnectWithContext(ctx context.Context) error
- func (a *AzureQueueStorage) WaitForClose(time.Duration) error
- func (a *AzureQueueStorage) Write(msg types.Message) error
- func (a *AzureQueueStorage) WriteWithContext(ctx context.Context, msg types.Message) error
- type AzureQueueStorageConfig
- type AzureTableStorage
- func (a *AzureTableStorage) CloseAsync()
- func (a *AzureTableStorage) Connect() error
- func (a *AzureTableStorage) ConnectWithContext(ctx context.Context) error
- func (a *AzureTableStorage) WaitForClose(time.Duration) error
- func (a *AzureTableStorage) Write(msg types.Message) error
- func (a *AzureTableStorage) WriteWithContext(wctx context.Context, msg types.Message) error
- type AzureTableStorageConfig
- type Cache
- type CacheConfig
- type Drop
- type DropConfig
- type DynamoDB
- func (d *DynamoDB) CloseAsync()
- func (d *DynamoDB) Connect() error
- func (d *DynamoDB) ConnectWithContext(ctx context.Context) error
- func (d *DynamoDB) WaitForClose(time.Duration) error
- func (d *DynamoDB) Write(msg types.Message) error
- func (d *DynamoDB) WriteWithContext(ctx context.Context, msg types.Message) error
- type DynamoDBConfig
- type Elasticsearch
- func (e *Elasticsearch) CloseAsync()
- func (e *Elasticsearch) Connect() error
- func (e *Elasticsearch) ConnectWithContext(ctx context.Context) error
- func (e *Elasticsearch) WaitForClose(timeout time.Duration) error
- func (e *Elasticsearch) Write(msg types.Message) error
- func (e *Elasticsearch) WriteWithContext(ctx context.Context, msg types.Message) error
- type ElasticsearchConfig
- type Files
- type FilesConfig
- type GCPPubSub
- func (c *GCPPubSub) CloseAsync()
- func (c *GCPPubSub) Connect() error
- func (c *GCPPubSub) ConnectWithContext(ctx context.Context) error
- func (c *GCPPubSub) WaitForClose(time.Duration) error
- func (c *GCPPubSub) Write(msg types.Message) error
- func (c *GCPPubSub) WriteWithContext(ctx context.Context, msg types.Message) error
- type GCPPubSubConfig
- type HDFS
- type HDFSConfig
- type HTTPClient
- func (h *HTTPClient) CloseAsync()
- func (h *HTTPClient) Connect() error
- func (h *HTTPClient) ConnectWithContext(ctx context.Context) error
- func (h *HTTPClient) WaitForClose(timeout time.Duration) error
- func (h *HTTPClient) Write(msg types.Message) error
- func (h *HTTPClient) WriteWithContext(ctx context.Context, msg types.Message) error
- type HTTPClientConfig
- type Kafka
- func (k *Kafka) CloseAsync()
- func (k *Kafka) Connect() error
- func (k *Kafka) ConnectWithContext(ctx context.Context) error
- func (k *Kafka) WaitForClose(timeout time.Duration) error
- func (k *Kafka) Write(msg types.Message) error
- func (k *Kafka) WriteWithContext(ctx context.Context, msg types.Message) error
- type KafkaConfig
- type Kinesis
- func (a *Kinesis) CloseAsync()
- func (a *Kinesis) Connect() error
- func (a *Kinesis) ConnectWithContext(ctx context.Context) error
- func (a *Kinesis) WaitForClose(time.Duration) error
- func (a *Kinesis) Write(msg types.Message) error
- func (a *Kinesis) WriteWithContext(ctx context.Context, msg types.Message) error
- type KinesisConfig
- type KinesisFirehose
- func (a *KinesisFirehose) CloseAsync()
- func (a *KinesisFirehose) Connect() error
- func (a *KinesisFirehose) ConnectWithContext(ctx context.Context) error
- func (a *KinesisFirehose) WaitForClose(time.Duration) error
- func (a *KinesisFirehose) Write(msg types.Message) error
- func (a *KinesisFirehose) WriteWithContext(ctx context.Context, msg types.Message) error
- type KinesisFirehoseConfig
- type MQTT
- func (m *MQTT) CloseAsync()
- func (m *MQTT) Connect() error
- func (m *MQTT) ConnectWithContext(ctx context.Context) error
- func (m *MQTT) WaitForClose(timeout time.Duration) error
- func (m *MQTT) Write(msg types.Message) error
- func (m *MQTT) WriteWithContext(ctx context.Context, msg types.Message) error
- type MQTTConfig
- type NATS
- func (n *NATS) CloseAsync()
- func (n *NATS) Connect() error
- func (n *NATS) ConnectWithContext(ctx context.Context) error
- func (n *NATS) WaitForClose(timeout time.Duration) error
- func (n *NATS) Write(msg types.Message) error
- func (n *NATS) WriteWithContext(ctx context.Context, msg types.Message) error
- type NATSConfig
- type NATSStream
- func (n *NATSStream) CloseAsync()
- func (n *NATSStream) Connect() error
- func (n *NATSStream) ConnectWithContext(ctx context.Context) error
- func (n *NATSStream) WaitForClose(timeout time.Duration) error
- func (n *NATSStream) Write(msg types.Message) error
- func (n *NATSStream) WriteWithContext(ctx context.Context, msg types.Message) error
- type NATSStreamConfig
- type NSQ
- type NSQConfig
- type Nanomsg
- func (s *Nanomsg) CloseAsync()
- func (s *Nanomsg) Connect() error
- func (s *Nanomsg) ConnectWithContext(ctx context.Context) error
- func (s *Nanomsg) WaitForClose(timeout time.Duration) error
- func (s *Nanomsg) Write(msg types.Message) error
- func (s *Nanomsg) WriteWithContext(ctx context.Context, msg types.Message) error
- type NanomsgConfig
- type OptionalAWSConfig
- type RedisHash
- func (r *RedisHash) CloseAsync()
- func (r *RedisHash) Connect() error
- func (r *RedisHash) ConnectWithContext(ctx context.Context) error
- func (r *RedisHash) WaitForClose(timeout time.Duration) error
- func (r *RedisHash) Write(msg types.Message) error
- func (r *RedisHash) WriteWithContext(ctx context.Context, msg types.Message) error
- type RedisHashConfig
- type RedisList
- func (r *RedisList) CloseAsync()
- func (r *RedisList) Connect() error
- func (r *RedisList) ConnectWithContext(ctx context.Context) error
- func (r *RedisList) WaitForClose(timeout time.Duration) error
- func (r *RedisList) Write(msg types.Message) error
- func (r *RedisList) WriteWithContext(ctx context.Context, msg types.Message) error
- type RedisListConfig
- type RedisPubSub
- func (r *RedisPubSub) CloseAsync()
- func (r *RedisPubSub) Connect() error
- func (r *RedisPubSub) ConnectWithContext(ctx context.Context) error
- func (r *RedisPubSub) WaitForClose(timeout time.Duration) error
- func (r *RedisPubSub) Write(msg types.Message) error
- func (r *RedisPubSub) WriteWithContext(ctx context.Context, msg types.Message) error
- type RedisPubSubConfig
- type RedisStreams
- func (r *RedisStreams) CloseAsync()
- func (r *RedisStreams) Connect() error
- func (r *RedisStreams) ConnectWithContext(ctx context.Context) error
- func (r *RedisStreams) WaitForClose(timeout time.Duration) error
- func (r *RedisStreams) Write(msg types.Message) error
- func (r *RedisStreams) WriteWithContext(ctx context.Context, msg types.Message) error
- type RedisStreamsConfig
- type SNS
- type SNSConfig
- type Socket
- func (s *Socket) CloseAsync()
- func (s *Socket) Connect() error
- func (s *Socket) ConnectWithContext(ctx context.Context) error
- func (s *Socket) WaitForClose(timeout time.Duration) error
- func (s *Socket) Write(msg types.Message) error
- func (s *Socket) WriteWithContext(ctx context.Context, msg types.Message) error
- type SocketConfig
- type TCP
- type TCPConfig
- type Type
- type UDP
- type UDPConfig
- type Websocket
- type WebsocketConfig
- type ZMQ4Config
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IterateBatchedSend ¶ added in v3.22.0
IterateBatchedSend executes a closure fn on each message of a batch, where the closure is expected to attempt a send and return an error. If an error is returned then it is added to a batch error in order to support index specific error handling.
However, if a fatal error is returned such as a connection loss or shut down then it is returned immediately.
Types ¶
type AMQP ¶
type AMQP struct {
// contains filtered or unexported fields
}
AMQP is an output type that serves AMQP messages.
func NewAMQPV2 ¶ added in v3.56.0
func NewAMQPV2(mgr types.Manager, conf AMQPConfig, log log.Modular, stats metrics.Type) (*AMQP, error)
NewAMQPV2 creates a new AMQP writer type.
func (*AMQP) CloseAsync ¶
func (a *AMQP) CloseAsync()
CloseAsync shuts down the AMQP output and stops processing messages.
func (*AMQP) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext establishes a connection to an AMQP server.
func (*AMQP) WaitForClose ¶
WaitForClose blocks until the AMQP output has closed down.
type AMQP1 ¶ added in v3.19.0
type AMQP1 struct {
// contains filtered or unexported fields
}
AMQP1 is an output type that serves AMQP1 messages.
func (*AMQP1) CloseAsync ¶ added in v3.19.0
func (a *AMQP1) CloseAsync()
CloseAsync shuts down the AMQP1 output and stops processing messages.
func (*AMQP1) ConnectWithContext ¶ added in v3.19.0
ConnectWithContext establishes a connection to an AMQP1 server.
func (*AMQP1) WaitForClose ¶ added in v3.19.0
WaitForClose blocks until the AMQP1 output has closed down.
type AMQP1Config ¶ added in v3.19.0
type AMQP1Config struct { URL string `json:"url" yaml:"url"` TargetAddress string `json:"target_address" yaml:"target_address"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` TLS btls.Config `json:"tls" yaml:"tls"` SASL sasl.Config `json:"sasl" yaml:"sasl"` Metadata output.Metadata `json:"metadata" yaml:"metadata"` }
AMQP1Config contains configuration fields for the AMQP1 output type.
func NewAMQP1Config ¶ added in v3.19.0
func NewAMQP1Config() AMQP1Config
NewAMQP1Config creates a new AMQP1Config with default values.
type AMQPConfig ¶
type AMQPConfig struct { URL string `json:"url" yaml:"url"` URLs []string `json:"urls" yaml:"urls"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Exchange string `json:"exchange" yaml:"exchange"` ExchangeDeclare AMQPExchangeDeclareConfig `json:"exchange_declare" yaml:"exchange_declare"` BindingKey string `json:"key" yaml:"key"` Type string `json:"type" yaml:"type"` ContentType string `json:"content_type" yaml:"content_type"` ContentEncoding string `json:"content_encoding" yaml:"content_encoding"` Metadata output.Metadata `json:"metadata" yaml:"metadata"` Priority string `json:"priority" yaml:"priority"` Persistent bool `json:"persistent" yaml:"persistent"` Mandatory bool `json:"mandatory" yaml:"mandatory"` Immediate bool `json:"immediate" yaml:"immediate"` TLS btls.Config `json:"tls" yaml:"tls"` }
AMQPConfig contains configuration fields for the AMQP output type.
func NewAMQPConfig ¶
func NewAMQPConfig() AMQPConfig
NewAMQPConfig creates a new AMQPConfig with default values.
type AMQPExchangeDeclareConfig ¶
type AMQPExchangeDeclareConfig struct { Enabled bool `json:"enabled" yaml:"enabled"` Type string `json:"type" yaml:"type"` Durable bool `json:"durable" yaml:"durable"` }
AMQPExchangeDeclareConfig contains fields indicating whether the target AMQP exchange needs to be declared, as well as any fields specifying how to accomplish that.
type AmazonS3 ¶
type AmazonS3 struct {
// contains filtered or unexported fields
}
AmazonS3 is a benthos writer.Type implementation that writes messages to an Amazon S3 bucket.
func NewAmazonS3
deprecated
func NewAmazonS3V2 ¶ added in v3.56.0
func NewAmazonS3V2( conf AmazonS3Config, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*AmazonS3, error)
NewAmazonS3V2 creates a new Amazon S3 bucket writer.Type.
func (*AmazonS3) CloseAsync ¶
func (a *AmazonS3) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*AmazonS3) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext attempts to establish a connection to the target S3 bucket.
func (*AmazonS3) WaitForClose ¶
WaitForClose will block until either the reader is closed or a specified timeout occurs.
type AmazonS3Config ¶
type AmazonS3Config struct { sess.Config `json:",inline" yaml:",inline"` Bucket string `json:"bucket" yaml:"bucket"` ForcePathStyleURLs bool `json:"force_path_style_urls" yaml:"force_path_style_urls"` Path string `json:"path" yaml:"path"` Tags map[string]string `json:"tags" yaml:"tags"` ContentType string `json:"content_type" yaml:"content_type"` ContentEncoding string `json:"content_encoding" yaml:"content_encoding"` CacheControl string `json:"cache_control" yaml:"cache_control"` ContentDisposition string `json:"content_disposition" yaml:"content_disposition"` ContentLanguage string `json:"content_language" yaml:"content_language"` WebsiteRedirectLocation string `json:"website_redirect_location" yaml:"website_redirect_location"` Metadata output.Metadata `json:"metadata" yaml:"metadata"` StorageClass string `json:"storage_class" yaml:"storage_class"` Timeout string `json:"timeout" yaml:"timeout"` KMSKeyID string `json:"kms_key_id" yaml:"kms_key_id"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
AmazonS3Config contains configuration fields for the AmazonS3 output type.
func NewAmazonS3Config ¶
func NewAmazonS3Config() AmazonS3Config
NewAmazonS3Config creates a new Config with default values.
type AmazonSQS ¶
type AmazonSQS struct {
// contains filtered or unexported fields
}
AmazonSQS is a benthos writer.Type implementation that writes messages to an Amazon SQS queue.
func NewAmazonSQS
deprecated
func NewAmazonSQSV2 ¶ added in v3.56.0
func NewAmazonSQSV2( conf AmazonSQSConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*AmazonSQS, error)
NewAmazonSQSV2 creates a new Amazon SQS writer.Type.
func (*AmazonSQS) CloseAsync ¶
func (a *AmazonSQS) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*AmazonSQS) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext attempts to establish a connection to the target SQS queue.
func (*AmazonSQS) WaitForClose ¶
WaitForClose will block until either the reader is closed or a specified timeout occurs.
type AmazonSQSConfig ¶
type AmazonSQSConfig struct { URL string `json:"url" yaml:"url"` MessageGroupID string `json:"message_group_id" yaml:"message_group_id"` MessageDeduplicationID string `json:"message_deduplication_id" yaml:"message_deduplication_id"` Metadata output.Metadata `json:"metadata" yaml:"metadata"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` retries.Config `json:",inline" yaml:",inline"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` // contains filtered or unexported fields }
AmazonSQSConfig contains configuration fields for the output AmazonSQS type.
func NewAmazonSQSConfig ¶
func NewAmazonSQSConfig() AmazonSQSConfig
NewAmazonSQSConfig creates a new Config with default values.
type AzureBlobStorage ¶ added in v3.21.0
type AzureBlobStorage struct {
// contains filtered or unexported fields
}
AzureBlobStorage is a benthos writer. Type implementation that writes messages to an Azure Blob Storage storage account.
func NewAzureBlobStorage ¶ added in v3.21.0
func NewAzureBlobStorage( conf AzureBlobStorageConfig, log log.Modular, stats metrics.Type, ) (*AzureBlobStorage, error)
NewAzureBlobStorage creates a new AzureBlobStorage writer.Type. Deprecated
func NewAzureBlobStorageV2 ¶ added in v3.56.0
func NewAzureBlobStorageV2( mgr types.Manager, conf AzureBlobStorageConfig, log log.Modular, stats metrics.Type, ) (*AzureBlobStorage, error)
NewAzureBlobStorageV2 creates a new AzureBlobStorage writer.Type.
func (*AzureBlobStorage) CloseAsync ¶ added in v3.21.0
func (a *AzureBlobStorage) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*AzureBlobStorage) Connect ¶ added in v3.21.0
func (a *AzureBlobStorage) Connect() error
Connect attempts to establish a connection to the target Blob Storage Account.
func (*AzureBlobStorage) ConnectWithContext ¶ added in v3.21.0
func (a *AzureBlobStorage) ConnectWithContext(ctx context.Context) error
ConnectWithContext attempts to establish a connection to the target Blob Storage Account.
func (*AzureBlobStorage) WaitForClose ¶ added in v3.21.0
func (a *AzureBlobStorage) WaitForClose(time.Duration) error
WaitForClose will block until either the reader is closed or a specified timeout occurs.
func (*AzureBlobStorage) Write ¶ added in v3.21.0
func (a *AzureBlobStorage) Write(msg types.Message) error
Write attempts to write message contents to a target Azure Blob Storage container as files.
func (*AzureBlobStorage) WriteWithContext ¶ added in v3.21.0
WriteWithContext attempts to write message contents to a target storage account as files.
type AzureBlobStorageConfig ¶ added in v3.21.0
type AzureBlobStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageSASToken string `json:"storage_sas_token" yaml:"storage_sas_token"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` Container string `json:"container" yaml:"container"` Path string `json:"path" yaml:"path"` BlobType string `json:"blob_type" yaml:"blob_type"` PublicAccessLevel string `json:"public_access_level" yaml:"public_access_level"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
AzureBlobStorageConfig contains configuration fields for the AzureBlobStorage output type.
func NewAzureBlobStorageConfig ¶ added in v3.21.0
func NewAzureBlobStorageConfig() AzureBlobStorageConfig
NewAzureBlobStorageConfig creates a new Config with default values.
type AzureQueueStorage ¶ added in v3.36.0
type AzureQueueStorage struct {
// contains filtered or unexported fields
}
AzureQueueStorage is a benthos writer.Type implementation that writes messages to an Azure Queue Storage queue.
func NewAzureQueueStorage
deprecated
added in
v3.36.0
func NewAzureQueueStorage( conf AzureQueueStorageConfig, log log.Modular, stats metrics.Type, ) (*AzureQueueStorage, error)
NewAzureQueueStorage creates a new Azure Queue Storage writer type.
Deprecated: use the V2 API instead.
func NewAzureQueueStorageV2 ¶ added in v3.56.0
func NewAzureQueueStorageV2( conf AzureQueueStorageConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*AzureQueueStorage, error)
NewAzureQueueStorageV2 creates a new Azure Queue Storage writer type.
func (*AzureQueueStorage) CloseAsync ¶ added in v3.36.0
func (a *AzureQueueStorage) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*AzureQueueStorage) Connect ¶ added in v3.36.0
func (a *AzureQueueStorage) Connect() error
Connect attempts to establish a connection to the target
func (*AzureQueueStorage) ConnectWithContext ¶ added in v3.36.0
func (a *AzureQueueStorage) ConnectWithContext(ctx context.Context) error
ConnectWithContext attempts to establish a connection to the target queue.
func (*AzureQueueStorage) WaitForClose ¶ added in v3.36.0
func (a *AzureQueueStorage) WaitForClose(time.Duration) error
WaitForClose will block until either the reader is closed or a specified timeout occurs.
func (*AzureQueueStorage) Write ¶ added in v3.36.0
func (a *AzureQueueStorage) Write(msg types.Message) error
Write attempts to write message contents to a target Azure Queue Storage queue.
func (*AzureQueueStorage) WriteWithContext ¶ added in v3.36.0
WriteWithContext attempts to write message contents to a target Queue Storage
type AzureQueueStorageConfig ¶ added in v3.36.0
type AzureQueueStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` QueueName string `json:"queue_name" yaml:"queue_name"` TTL string `json:"ttl" yaml:"ttl"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
AzureQueueStorageConfig contains configuration fields for the output Azure Queue Storage type.
func NewAzureQueueStorageConfig ¶ added in v3.36.0
func NewAzureQueueStorageConfig() AzureQueueStorageConfig
NewAzureQueueStorageConfig creates a new Config with default values.
type AzureTableStorage ¶ added in v3.23.0
type AzureTableStorage struct {
// contains filtered or unexported fields
}
AzureTableStorage is a benthos writer. Type implementation that writes messages to an Azure Table Storage table.
func NewAzureTableStorage
deprecated
added in
v3.23.0
func NewAzureTableStorage( conf AzureTableStorageConfig, log log.Modular, stats metrics.Type, ) (*AzureTableStorage, error)
NewAzureTableStorage creates a new Azure Table Storage writer Type.
Deprecated: use the V2 API instead.
func NewAzureTableStorageV2 ¶ added in v3.56.0
func NewAzureTableStorageV2( conf AzureTableStorageConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*AzureTableStorage, error)
NewAzureTableStorageV2 creates a new Azure Table Storage writer Type.
func (*AzureTableStorage) CloseAsync ¶ added in v3.23.0
func (a *AzureTableStorage) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*AzureTableStorage) Connect ¶ added in v3.23.0
func (a *AzureTableStorage) Connect() error
Connect attempts to establish a connection to the target Table Storage Account.
func (*AzureTableStorage) ConnectWithContext ¶ added in v3.23.0
func (a *AzureTableStorage) ConnectWithContext(ctx context.Context) error
ConnectWithContext attempts to establish a connection to the target Table Storage Account.
func (*AzureTableStorage) WaitForClose ¶ added in v3.23.0
func (a *AzureTableStorage) WaitForClose(time.Duration) error
WaitForClose will block until either the reader is closed or a specified timeout occurs.
func (*AzureTableStorage) Write ¶ added in v3.23.0
func (a *AzureTableStorage) Write(msg types.Message) error
Write attempts to write message contents to a target Azure Table Storage container as files.
func (*AzureTableStorage) WriteWithContext ¶ added in v3.23.0
WriteWithContext attempts to write message contents to a target storage account as files.
type AzureTableStorageConfig ¶ added in v3.23.0
type AzureTableStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` TableName string `json:"table_name" yaml:"table_name"` PartitionKey string `json:"partition_key" yaml:"partition_key"` RowKey string `json:"row_key" yaml:"row_key"` Properties map[string]string `json:"properties" yaml:"properties"` InsertType string `json:"insert_type" yaml:"insert_type"` Timeout string `json:"timeout" yaml:"timeout"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
AzureTableStorageConfig contains configuration fields for the AzureTableStorage output type.
func NewAzureTableStorageConfig ¶ added in v3.23.0
func NewAzureTableStorageConfig() AzureTableStorageConfig
NewAzureTableStorageConfig creates a new Config with default values.
type Cache ¶
type Cache struct {
// contains filtered or unexported fields
}
Cache is a benthos writer.Type implementation that writes messages to a Cache directory.
func NewCache ¶
func NewCache( conf CacheConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*Cache, error)
NewCache creates a new Cache writer.Type.
func (*Cache) CloseAsync ¶
func (c *Cache) CloseAsync()
CloseAsync begins cleaning up resources used by this writer asynchronously.
func (*Cache) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext does nothing.
func (*Cache) WaitForClose ¶
WaitForClose will block until either the writer is closed or a specified timeout occurs.
type CacheConfig ¶
type CacheConfig struct { Target string `json:"target" yaml:"target"` Key string `json:"key" yaml:"key"` TTL string `json:"ttl" yaml:"ttl"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
CacheConfig contains configuration fields for the Cache output type.
func NewCacheConfig ¶
func NewCacheConfig() CacheConfig
NewCacheConfig creates a new Config with default values.
type Drop ¶
type Drop struct {
// contains filtered or unexported fields
}
Drop is a benthos writer.Type implementation that writes message parts to no where.
func (*Drop) CloseAsync ¶
func (d *Drop) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*Drop) WaitForClose ¶
WaitForClose will block until either the reader is closed or a specified timeout occurs.
type DropConfig ¶
type DropConfig struct{}
DropConfig contains configuration fields for the drop output type.
func NewDropConfig ¶
func NewDropConfig() DropConfig
NewDropConfig creates a new DropConfig with default values.
type DynamoDB ¶
type DynamoDB struct {
// contains filtered or unexported fields
}
DynamoDB is a benthos writer.Type implementation that writes messages to an Amazon SQS queue.
func NewDynamoDB
deprecated
func NewDynamoDBV2 ¶ added in v3.56.0
func NewDynamoDBV2( conf DynamoDBConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*DynamoDB, error)
NewDynamoDBV2 creates a new Amazon SQS writer.Type.
func (*DynamoDB) CloseAsync ¶
func (d *DynamoDB) CloseAsync()
CloseAsync begins cleaning up resources used by this writer asynchronously.
func (*DynamoDB) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext attempts to establish a connection to the target DynamoDB table.
func (*DynamoDB) WaitForClose ¶
WaitForClose will block until either the writer is closed or a specified timeout occurs.
type DynamoDBConfig ¶
type DynamoDBConfig struct { Table string `json:"table" yaml:"table"` StringColumns map[string]string `json:"string_columns" yaml:"string_columns"` JSONMapColumns map[string]string `json:"json_map_columns" yaml:"json_map_columns"` TTL string `json:"ttl" yaml:"ttl"` TTLKey string `json:"ttl_key" yaml:"ttl_key"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` retries.Config `json:",inline" yaml:",inline"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` // contains filtered or unexported fields }
DynamoDBConfig contains config fields for the DynamoDB output type.
func NewDynamoDBConfig ¶
func NewDynamoDBConfig() DynamoDBConfig
NewDynamoDBConfig creates a DynamoDBConfig populated with default values.
type Elasticsearch ¶
type Elasticsearch struct {
// contains filtered or unexported fields
}
Elasticsearch is a writer type that writes messages into elasticsearch.
func NewElasticsearch
deprecated
func NewElasticsearch(conf ElasticsearchConfig, log log.Modular, stats metrics.Type) (*Elasticsearch, error)
NewElasticsearch creates a new Elasticsearch writer type.
Deprecated: use the V2 API instead.
func NewElasticsearchV2 ¶ added in v3.56.0
func NewElasticsearchV2(conf ElasticsearchConfig, mgr types.Manager, log log.Modular, stats metrics.Type) (*Elasticsearch, error)
NewElasticsearchV2 creates a new Elasticsearch writer type.
func (*Elasticsearch) CloseAsync ¶
func (e *Elasticsearch) CloseAsync()
CloseAsync shuts down the Elasticsearch writer and stops processing messages.
func (*Elasticsearch) Connect ¶
func (e *Elasticsearch) Connect() error
Connect attempts to establish a connection to a Elasticsearch broker.
func (*Elasticsearch) ConnectWithContext ¶ added in v3.8.0
func (e *Elasticsearch) ConnectWithContext(ctx context.Context) error
ConnectWithContext attempts to establish a connection to a Elasticsearch broker.
func (*Elasticsearch) WaitForClose ¶
func (e *Elasticsearch) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the Elasticsearch writer has closed down.
func (*Elasticsearch) Write ¶
func (e *Elasticsearch) Write(msg types.Message) error
Write will attempt to write a message to Elasticsearch, wait for acknowledgement, and returns an error if applicable.
func (*Elasticsearch) WriteWithContext ¶ added in v3.8.0
WriteWithContext will attempt to write a message to Elasticsearch, wait for acknowledgement, and returns an error if applicable.
type ElasticsearchConfig ¶
type ElasticsearchConfig struct { URLs []string `json:"urls" yaml:"urls"` Sniff bool `json:"sniff" yaml:"sniff"` Healthcheck bool `json:"healthcheck" yaml:"healthcheck"` ID string `json:"id" yaml:"id"` Action string `json:"action" yaml:"action"` Index string `json:"index" yaml:"index"` Pipeline string `json:"pipeline" yaml:"pipeline"` Routing string `json:"routing" yaml:"routing"` Type string `json:"type" yaml:"type"` Timeout string `json:"timeout" yaml:"timeout"` TLS btls.Config `json:"tls" yaml:"tls"` Auth auth.BasicAuthConfig `json:"basic_auth" yaml:"basic_auth"` AWS OptionalAWSConfig `json:"aws" yaml:"aws"` GzipCompression bool `json:"gzip_compression" yaml:"gzip_compression"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` retries.Config `json:",inline" yaml:",inline"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
ElasticsearchConfig contains configuration fields for the Elasticsearch output type.
func NewElasticsearchConfig ¶
func NewElasticsearchConfig() ElasticsearchConfig
NewElasticsearchConfig creates a new ElasticsearchConfig with default values.
type Files ¶
type Files struct {
// contains filtered or unexported fields
}
Files is a benthos writer.Type implementation that writes message parts each to their own file.
func NewFilesV2 ¶ added in v3.56.0
func NewFilesV2( conf FilesConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*Files, error)
NewFilesV2 creates a new file based writer.Type.
func (*Files) CloseAsync ¶
func (f *Files) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*Files) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext is a noop.
func (*Files) WaitForClose ¶
WaitForClose will block until either the reader is closed or a specified timeout occurs.
type FilesConfig ¶
type FilesConfig struct {
Path string `json:"path" yaml:"path"`
}
FilesConfig contains configuration fields for the files output type.
func NewFilesConfig ¶
func NewFilesConfig() FilesConfig
NewFilesConfig creates a new Config with default values.
type GCPPubSub ¶
type GCPPubSub struct {
// contains filtered or unexported fields
}
GCPPubSub is a benthos writer.Type implementation that writes messages to a GCP Pub/Sub topic.
func NewGCPPubSub
deprecated
func NewGCPPubSubV2 ¶ added in v3.56.0
func NewGCPPubSubV2( conf GCPPubSubConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*GCPPubSub, error)
NewGCPPubSubV2 creates a new GCP Cloud Pub/Sub writer.Type.
func (*GCPPubSub) CloseAsync ¶
func (c *GCPPubSub) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*GCPPubSub) Connect ¶
Connect attempts to establish a connection to the target GCP Pub/Sub topic.
func (*GCPPubSub) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext attempts to establish a connection to the target GCP Pub/Sub topic.
func (*GCPPubSub) WaitForClose ¶
WaitForClose will block until either the reader is closed or a specified timeout occurs.
type GCPPubSubConfig ¶
type GCPPubSubConfig struct { ProjectID string `json:"project" yaml:"project"` TopicID string `json:"topic" yaml:"topic"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` PublishTimeout string `json:"publish_timeout" yaml:"publish_timeout"` Metadata output.Metadata `json:"metadata" yaml:"metadata"` }
GCPPubSubConfig contains configuration fields for the output GCPPubSub type.
func NewGCPPubSubConfig ¶
func NewGCPPubSubConfig() GCPPubSubConfig
NewGCPPubSubConfig creates a new Config with default values.
type HDFS ¶
type HDFS struct {
// contains filtered or unexported fields
}
HDFS is a benthos writer.Type implementation that writes messages to a HDFS directory.
func NewHDFSV2 ¶ added in v3.56.0
func NewHDFSV2( conf HDFSConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*HDFS, error)
NewHDFSV2 creates a new HDFS writer.Type.
func (*HDFS) CloseAsync ¶
func (h *HDFS) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*HDFS) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext attempts to establish a connection to the target HDFS host.
func (*HDFS) WaitForClose ¶
WaitForClose will block until either the reader is closed or a specified timeout occurs.
type HDFSConfig ¶
type HDFSConfig struct { Hosts []string `json:"hosts" yaml:"hosts"` User string `json:"user" yaml:"user"` Directory string `json:"directory" yaml:"directory"` Path string `json:"path" yaml:"path"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
HDFSConfig contains configuration fields for the HDFS output type.
func NewHDFSConfig ¶
func NewHDFSConfig() HDFSConfig
NewHDFSConfig creates a new Config with default values.
type HTTPClient ¶
type HTTPClient struct {
// contains filtered or unexported fields
}
HTTPClient is an output type that sends messages as HTTP requests to a target server endpoint.
func NewHTTPClient ¶
func NewHTTPClient( conf HTTPClientConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*HTTPClient, error)
NewHTTPClient creates a new HTTPClient writer type.
func (*HTTPClient) CloseAsync ¶
func (h *HTTPClient) CloseAsync()
CloseAsync shuts down the HTTPClient output and stops processing messages.
func (*HTTPClient) ConnectWithContext ¶ added in v3.8.0
func (h *HTTPClient) ConnectWithContext(ctx context.Context) error
ConnectWithContext does nothing.
func (*HTTPClient) WaitForClose ¶
func (h *HTTPClient) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the HTTPClient output has closed down.
func (*HTTPClient) Write ¶
func (h *HTTPClient) Write(msg types.Message) error
Write attempts to send a message to an HTTP server, this attempt may include retries, and if all retries fail an error is returned.
func (*HTTPClient) WriteWithContext ¶ added in v3.8.0
WriteWithContext attempts to send a message to an HTTP server, this attempt may include retries, and if all retries fail an error is returned.
type HTTPClientConfig ¶
type HTTPClientConfig struct { client.Config `json:",inline" yaml:",inline"` BatchAsMultipart bool `json:"batch_as_multipart" yaml:"batch_as_multipart"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` PropagateResponse bool `json:"propagate_response" yaml:"propagate_response"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
HTTPClientConfig contains configuration fields for the HTTPClient output type.
func NewHTTPClientConfig ¶
func NewHTTPClientConfig() HTTPClientConfig
NewHTTPClientConfig creates a new HTTPClientConfig with default values.
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka is a writer type that writes messages into kafka.
func NewKafka ¶
func NewKafka(conf KafkaConfig, mgr types.Manager, log log.Modular, stats metrics.Type) (*Kafka, error)
NewKafka creates a new Kafka writer type.
func (*Kafka) CloseAsync ¶
func (k *Kafka) CloseAsync()
CloseAsync shuts down the Kafka writer and stops processing messages.
func (*Kafka) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext attempts to establish a connection to a Kafka broker.
func (*Kafka) WaitForClose ¶
WaitForClose blocks until the Kafka writer has closed down.
type KafkaConfig ¶
type KafkaConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` ClientID string `json:"client_id" yaml:"client_id"` RackID string `json:"rack_id" yaml:"rack_id"` Key string `json:"key" yaml:"key"` Partitioner string `json:"partitioner" yaml:"partitioner"` Partition string `json:"partition" yaml:"partition"` Topic string `json:"topic" yaml:"topic"` Compression string `json:"compression" yaml:"compression"` MaxMsgBytes int `json:"max_msg_bytes" yaml:"max_msg_bytes"` Timeout string `json:"timeout" yaml:"timeout"` AckReplicas bool `json:"ack_replicas" yaml:"ack_replicas"` TargetVersion string `json:"target_version" yaml:"target_version"` TLS btls.Config `json:"tls" yaml:"tls"` SASL sasl.Config `json:"sasl" yaml:"sasl"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` retries.Config `json:",inline" yaml:",inline"` RetryAsBatch bool `json:"retry_as_batch" yaml:"retry_as_batch"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` StaticHeaders map[string]string `json:"static_headers" yaml:"static_headers"` Metadata output.Metadata `json:"metadata" yaml:"metadata"` InjectTracingMap string `json:"inject_tracing_map" yaml:"inject_tracing_map"` // TODO: V4 remove this. RoundRobinPartitions bool `json:"round_robin_partitions" yaml:"round_robin_partitions"` }
KafkaConfig contains configuration fields for the Kafka output type.
func NewKafkaConfig ¶
func NewKafkaConfig() KafkaConfig
NewKafkaConfig creates a new KafkaConfig with default values.
type Kinesis ¶
type Kinesis struct {
// contains filtered or unexported fields
}
Kinesis is a benthos writer.Type implementation that writes messages to an Amazon Kinesis stream.
func NewKinesis
deprecated
func NewKinesisV2 ¶ added in v3.56.0
func NewKinesisV2( conf KinesisConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*Kinesis, error)
NewKinesisV2 creates a new Amazon Kinesis writer.Type.
func (*Kinesis) CloseAsync ¶
func (a *Kinesis) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*Kinesis) Connect ¶
Connect creates a new Kinesis client and ensures that the target Kinesis stream exists.
func (*Kinesis) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext creates a new Kinesis client and ensures that the target Kinesis stream exists.
func (*Kinesis) WaitForClose ¶
WaitForClose will block until either the reader is closed or a specified timeout occurs.
func (*Kinesis) Write ¶
Write attempts to write message contents to a target Kinesis stream in batches of 500. If throttling is detected, failed messages are retried according to the configurable backoff settings.
func (*Kinesis) WriteWithContext ¶ added in v3.8.0
WriteWithContext attempts to write message contents to a target Kinesis stream in batches of 500. If throttling is detected, failed messages are retried according to the configurable backoff settings.
type KinesisConfig ¶
type KinesisConfig struct { Stream string `json:"stream" yaml:"stream"` HashKey string `json:"hash_key" yaml:"hash_key"` PartitionKey string `json:"partition_key" yaml:"partition_key"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` retries.Config `json:",inline" yaml:",inline"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` // contains filtered or unexported fields }
KinesisConfig contains configuration fields for the Kinesis output type.
func NewKinesisConfig ¶
func NewKinesisConfig() KinesisConfig
NewKinesisConfig creates a new Config with default values.
type KinesisFirehose ¶ added in v3.1.0
type KinesisFirehose struct {
// contains filtered or unexported fields
}
KinesisFirehose is a benthos writer.Type implementation that writes messages to an Amazon Kinesis Firehose destination.
func NewKinesisFirehose ¶ added in v3.1.0
func NewKinesisFirehose( conf KinesisFirehoseConfig, log log.Modular, stats metrics.Type, ) (*KinesisFirehose, error)
NewKinesisFirehose creates a new Amazon Kinesis Firehose writer.Type.
func (*KinesisFirehose) CloseAsync ¶ added in v3.1.0
func (a *KinesisFirehose) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*KinesisFirehose) Connect ¶ added in v3.1.0
func (a *KinesisFirehose) Connect() error
Connect creates a new Kinesis Firehose client and ensures that the target Kinesis Firehose delivery stream.
func (*KinesisFirehose) ConnectWithContext ¶ added in v3.8.0
func (a *KinesisFirehose) ConnectWithContext(ctx context.Context) error
ConnectWithContext creates a new Kinesis Firehose client and ensures that the target Kinesis Firehose delivery stream.
func (*KinesisFirehose) WaitForClose ¶ added in v3.1.0
func (a *KinesisFirehose) WaitForClose(time.Duration) error
WaitForClose will block until either the reader is closed or a specified timeout occurs.
func (*KinesisFirehose) Write ¶ added in v3.1.0
func (a *KinesisFirehose) Write(msg types.Message) error
Write attempts to write message contents to a target Kinesis Firehose delivery stream in batches of 500. If throttling is detected, failed messages are retried according to the configurable backoff settings.
func (*KinesisFirehose) WriteWithContext ¶ added in v3.8.0
WriteWithContext attempts to write message contents to a target Kinesis Firehose delivery stream in batches of 500. If throttling is detected, failed messages are retried according to the configurable backoff settings.
type KinesisFirehoseConfig ¶ added in v3.1.0
type KinesisFirehoseConfig struct { Stream string `json:"stream" yaml:"stream"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` retries.Config `json:",inline" yaml:",inline"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` // contains filtered or unexported fields }
KinesisFirehoseConfig contains configuration fields for the KinesisFirehose output type.
func NewKinesisFirehoseConfig ¶ added in v3.1.0
func NewKinesisFirehoseConfig() KinesisFirehoseConfig
NewKinesisFirehoseConfig creates a new Config with default values.
type MQTT ¶
type MQTT struct {
// contains filtered or unexported fields
}
MQTT is an output type that serves MQTT messages.
func NewMQTTV2 ¶ added in v3.56.0
func NewMQTTV2( conf MQTTConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*MQTT, error)
NewMQTTV2 creates a new MQTT output type.
func (*MQTT) CloseAsync ¶
func (m *MQTT) CloseAsync()
CloseAsync shuts down the MQTT output and stops processing messages.
func (*MQTT) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext establishes a connection to an MQTT server.
func (*MQTT) WaitForClose ¶
WaitForClose blocks until the MQTT output has closed down.
type MQTTConfig ¶
type MQTTConfig struct { URLs []string `json:"urls" yaml:"urls"` QoS uint8 `json:"qos" yaml:"qos"` Retained bool `json:"retained" yaml:"retained"` RetainedInterpolated string `json:"retained_interpolated" yaml:"retained_interpolated"` Topic string `json:"topic" yaml:"topic"` ClientID string `json:"client_id" yaml:"client_id"` DynamicClientIDSuffix string `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"` Will mqttconf.Will `json:"will" yaml:"will"` User string `json:"user" yaml:"user"` Password string `json:"password" yaml:"password"` ConnectTimeout string `json:"connect_timeout" yaml:"connect_timeout"` WriteTimeout string `json:"write_timeout" yaml:"write_timeout"` KeepAlive int64 `json:"keepalive" yaml:"keepalive"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` TLS tls.Config `json:"tls" yaml:"tls"` }
MQTTConfig contains configuration fields for the MQTT output type.
func NewMQTTConfig ¶
func NewMQTTConfig() MQTTConfig
NewMQTTConfig creates a new MQTTConfig with default values.
type NATS ¶
type NATS struct {
// contains filtered or unexported fields
}
NATS is an output type that serves NATS messages.
func NewNATSV2 ¶ added in v3.56.0
func NewNATSV2(conf NATSConfig, mgr types.Manager, log log.Modular, stats metrics.Type) (*NATS, error)
NewNATSV2 creates a new NATS output type.
func (*NATS) CloseAsync ¶
func (n *NATS) CloseAsync()
CloseAsync shuts down the MQTT output and stops processing messages.
func (*NATS) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext attempts to establish a connection to NATS servers.
func (*NATS) WaitForClose ¶
WaitForClose blocks until the NATS output has closed down.
type NATSConfig ¶
type NATSConfig struct { URLs []string `json:"urls" yaml:"urls"` Subject string `json:"subject" yaml:"subject"` Headers map[string]string `json:"headers" yaml:"headers"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` TLS btls.Config `json:"tls" yaml:"tls"` Auth auth.Config `json:"auth" yaml:"auth"` }
NATSConfig contains configuration fields for the NATS output type.
func NewNATSConfig ¶
func NewNATSConfig() NATSConfig
NewNATSConfig creates a new NATSConfig with default values.
type NATSStream ¶
type NATSStream struct {
// contains filtered or unexported fields
}
NATSStream is an output type that serves NATS messages.
func NewNATSStream ¶
func NewNATSStream(conf NATSStreamConfig, log log.Modular, stats metrics.Type) (*NATSStream, error)
NewNATSStream creates a new NATS Stream output type.
func (*NATSStream) CloseAsync ¶
func (n *NATSStream) CloseAsync()
CloseAsync shuts down the MQTT output and stops processing messages.
func (*NATSStream) Connect ¶
func (n *NATSStream) Connect() error
Connect attempts to establish a connection to NATS servers.
func (*NATSStream) ConnectWithContext ¶ added in v3.8.0
func (n *NATSStream) ConnectWithContext(ctx context.Context) error
ConnectWithContext attempts to establish a connection to NATS servers.
func (*NATSStream) WaitForClose ¶
func (n *NATSStream) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the NATS output has closed down.
func (*NATSStream) Write ¶
func (n *NATSStream) Write(msg types.Message) error
Write attempts to write a message.
func (*NATSStream) WriteWithContext ¶ added in v3.8.0
WriteWithContext attempts to write a message.
type NATSStreamConfig ¶
type NATSStreamConfig struct { URLs []string `json:"urls" yaml:"urls"` ClusterID string `json:"cluster_id" yaml:"cluster_id"` ClientID string `json:"client_id" yaml:"client_id"` Subject string `json:"subject" yaml:"subject"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` TLS btls.Config `json:"tls" yaml:"tls"` Auth auth.Config `json:"auth" yaml:"auth"` }
NATSStreamConfig contains configuration fields for the NATSStream output type.
func NewNATSStreamConfig ¶
func NewNATSStreamConfig() NATSStreamConfig
NewNATSStreamConfig creates a new NATSStreamConfig with default values.
type NSQ ¶
type NSQ struct {
// contains filtered or unexported fields
}
NSQ is an output type that serves NSQ messages.
func (*NSQ) CloseAsync ¶
func (n *NSQ) CloseAsync()
CloseAsync shuts down the NSQ output and stops processing messages.
func (*NSQ) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext attempts to establish a connection to NSQ servers.
func (*NSQ) WaitForClose ¶
WaitForClose blocks until the NSQ output has closed down.
type NSQConfig ¶
type NSQConfig struct { Address string `json:"nsqd_tcp_address" yaml:"nsqd_tcp_address"` Topic string `json:"topic" yaml:"topic"` UserAgent string `json:"user_agent" yaml:"user_agent"` TLS btls.Config `json:"tls" yaml:"tls"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
NSQConfig contains configuration fields for the NSQ output type.
func NewNSQConfig ¶
func NewNSQConfig() NSQConfig
NewNSQConfig creates a new NSQConfig with default values.
type Nanomsg ¶
type Nanomsg struct {
// contains filtered or unexported fields
}
Nanomsg is an output type that serves Nanomsg messages.
func NewNanomsg ¶
NewNanomsg creates a new Nanomsg output type.
func (*Nanomsg) CloseAsync ¶
func (s *Nanomsg) CloseAsync()
CloseAsync shuts down the Nanomsg output and stops processing messages.
func (*Nanomsg) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext establishes a connection to a nanomsg socket.
func (*Nanomsg) WaitForClose ¶
WaitForClose blocks until the Nanomsg output has closed down.
type NanomsgConfig ¶
type NanomsgConfig struct { URLs []string `json:"urls" yaml:"urls"` Bind bool `json:"bind" yaml:"bind"` SocketType string `json:"socket_type" yaml:"socket_type"` PollTimeout string `json:"poll_timeout" yaml:"poll_timeout"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
NanomsgConfig contains configuration fields for the Nanomsg output type.
func NewNanomsgConfig ¶
func NewNanomsgConfig() NanomsgConfig
NewNanomsgConfig creates a new NanomsgConfig with default values.
type OptionalAWSConfig ¶
type OptionalAWSConfig struct { Enabled bool `json:"enabled" yaml:"enabled"` sess.Config `json:",inline" yaml:",inline"` }
OptionalAWSConfig contains config fields for AWS authentication with an enable flag.
type RedisHash ¶
type RedisHash struct {
// contains filtered or unexported fields
}
RedisHash is an output type that writes hash objects to Redis using the HMSET command.
func NewRedisHash
deprecated
func NewRedisHashV2 ¶ added in v3.56.0
func NewRedisHashV2( conf RedisHashConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*RedisHash, error)
NewRedisHashV2 creates a new RedisHash output type.
func (*RedisHash) CloseAsync ¶
func (r *RedisHash) CloseAsync()
CloseAsync shuts down the RedisHash output and stops processing messages.
func (*RedisHash) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext establishes a connection to an RedisHash server.
func (*RedisHash) WaitForClose ¶
WaitForClose blocks until the RedisHash output has closed down.
type RedisHashConfig ¶
type RedisHashConfig struct { bredis.Config `json:",inline" yaml:",inline"` Key string `json:"key" yaml:"key"` WalkMetadata bool `json:"walk_metadata" yaml:"walk_metadata"` WalkJSONObject bool `json:"walk_json_object" yaml:"walk_json_object"` Fields map[string]string `json:"fields" yaml:"fields"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
RedisHashConfig contains configuration fields for the RedisHash output type.
func NewRedisHashConfig ¶
func NewRedisHashConfig() RedisHashConfig
NewRedisHashConfig creates a new RedisHashConfig with default values.
type RedisList ¶
type RedisList struct {
// contains filtered or unexported fields
}
RedisList is an output type that serves RedisList messages.
func NewRedisList
deprecated
func NewRedisListV2 ¶ added in v3.56.0
func NewRedisListV2( conf RedisListConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*RedisList, error)
NewRedisListV2 creates a new RedisList output type.
func (*RedisList) CloseAsync ¶
func (r *RedisList) CloseAsync()
CloseAsync shuts down the RedisList output and stops processing messages.
func (*RedisList) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext establishes a connection to an RedisList server.
func (*RedisList) WaitForClose ¶
WaitForClose blocks until the RedisList output has closed down.
type RedisListConfig ¶
type RedisListConfig struct { bredis.Config `json:",inline" yaml:",inline"` Key string `json:"key" yaml:"key"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
RedisListConfig contains configuration fields for the RedisList output type.
func NewRedisListConfig ¶
func NewRedisListConfig() RedisListConfig
NewRedisListConfig creates a new RedisListConfig with default values.
type RedisPubSub ¶
type RedisPubSub struct {
// contains filtered or unexported fields
}
RedisPubSub is an output type that serves RedisPubSub messages.
func NewRedisPubSub
deprecated
func NewRedisPubSub( conf RedisPubSubConfig, log log.Modular, stats metrics.Type, ) (*RedisPubSub, error)
NewRedisPubSub creates a new RedisPubSub output type.
Deprecated: use the V2 API instead.
func NewRedisPubSubV2 ¶ added in v3.56.0
func NewRedisPubSubV2( conf RedisPubSubConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*RedisPubSub, error)
NewRedisPubSubV2 creates a new RedisPubSub output type.
func (*RedisPubSub) CloseAsync ¶
func (r *RedisPubSub) CloseAsync()
CloseAsync shuts down the RedisPubSub output and stops processing messages.
func (*RedisPubSub) Connect ¶
func (r *RedisPubSub) Connect() error
Connect establishes a connection to an RedisPubSub server.
func (*RedisPubSub) ConnectWithContext ¶ added in v3.8.0
func (r *RedisPubSub) ConnectWithContext(ctx context.Context) error
ConnectWithContext establishes a connection to an RedisPubSub server.
func (*RedisPubSub) WaitForClose ¶
func (r *RedisPubSub) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the RedisPubSub output has closed down.
func (*RedisPubSub) Write ¶
func (r *RedisPubSub) Write(msg types.Message) error
Write attempts to write a message by pushing it to a Redis pub/sub topic.
func (*RedisPubSub) WriteWithContext ¶ added in v3.8.0
WriteWithContext attempts to write a message by pushing it to a Redis pub/sub topic.
type RedisPubSubConfig ¶
type RedisPubSubConfig struct { bredis.Config `json:",inline" yaml:",inline"` Channel string `json:"channel" yaml:"channel"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
RedisPubSubConfig contains configuration fields for the RedisPubSub output type.
func NewRedisPubSubConfig ¶
func NewRedisPubSubConfig() RedisPubSubConfig
NewRedisPubSubConfig creates a new RedisPubSubConfig with default values.
type RedisStreams ¶
type RedisStreams struct {
// contains filtered or unexported fields
}
RedisStreams is an output type that serves RedisStreams messages.
func NewRedisStreams ¶
func NewRedisStreams( conf RedisStreamsConfig, log log.Modular, stats metrics.Type, ) (*RedisStreams, error)
NewRedisStreams creates a new RedisStreams output type.
func (*RedisStreams) CloseAsync ¶
func (r *RedisStreams) CloseAsync()
CloseAsync shuts down the RedisStreams output and stops processing messages.
func (*RedisStreams) Connect ¶
func (r *RedisStreams) Connect() error
Connect establishes a connection to an RedisStreams server.
func (*RedisStreams) ConnectWithContext ¶ added in v3.8.0
func (r *RedisStreams) ConnectWithContext(ctx context.Context) error
ConnectWithContext establishes a connection to an RedisStreams server.
func (*RedisStreams) WaitForClose ¶
func (r *RedisStreams) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the RedisStreams output has closed down.
func (*RedisStreams) Write ¶
func (r *RedisStreams) Write(msg types.Message) error
Write attempts to write a message by pushing it to a Redis stream.
func (*RedisStreams) WriteWithContext ¶ added in v3.8.0
WriteWithContext attempts to write a message by pushing it to a Redis stream.
type RedisStreamsConfig ¶
type RedisStreamsConfig struct { bredis.Config `json:",inline" yaml:",inline"` Stream string `json:"stream" yaml:"stream"` BodyKey string `json:"body_key" yaml:"body_key"` MaxLenApprox int64 `json:"max_length" yaml:"max_length"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` Metadata output.Metadata `json:"metadata" yaml:"metadata"` Batching batch.PolicyConfig `json:"batching" yaml:"batching"` }
RedisStreamsConfig contains configuration fields for the RedisStreams output type.
func NewRedisStreamsConfig ¶
func NewRedisStreamsConfig() RedisStreamsConfig
NewRedisStreamsConfig creates a new RedisStreamsConfig with default values.
type SNS ¶
type SNS struct {
// contains filtered or unexported fields
}
SNS is a benthos writer.Type implementation that writes messages to an Amazon SNS queue.
func (*SNS) CloseAsync ¶
func (a *SNS) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*SNS) ConnectWithContext ¶ added in v3.8.0
ConnectWithContext attempts to establish a connection to the target SNS queue.
func (*SNS) WaitForClose ¶
WaitForClose will block until either the reader is closed or a specified timeout occurs.
type SNSConfig ¶
type SNSConfig struct { TopicArn string `json:"topic_arn" yaml:"topic_arn"` MessageGroupID string `json:"message_group_id" yaml:"message_group_id"` MessageDeduplicationID string `json:"message_deduplication_id" yaml:"message_deduplication_id"` Metadata output.Metadata `json:"metadata" yaml:"metadata"` Timeout string `json:"timeout" yaml:"timeout"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` // contains filtered or unexported fields }
SNSConfig contains configuration fields for the output SNS type.
func NewSNSConfig ¶
func NewSNSConfig() SNSConfig
NewSNSConfig creates a new Config with default values.
type Socket ¶ added in v3.9.0
type Socket struct {
// contains filtered or unexported fields
}
Socket is an output type that sends messages as a continuous steam of line delimied messages over socket.
func NewSocket ¶ added in v3.9.0
func NewSocket( conf SocketConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*Socket, error)
NewSocket creates a new Socket writer type.
func (*Socket) CloseAsync ¶ added in v3.9.0
func (s *Socket) CloseAsync()
CloseAsync shuts down the socket output and stops processing messages.
func (*Socket) Connect ¶ added in v3.9.0
Connect establises a connection to the target socket server.
func (*Socket) ConnectWithContext ¶ added in v3.43.0
ConnectWithContext establises a connection to the target socket server.
func (*Socket) WaitForClose ¶ added in v3.9.0
WaitForClose blocks until the socket output has closed down.
type SocketConfig ¶ added in v3.9.0
type SocketConfig struct { Network string `json:"network" yaml:"network"` Address string `json:"address" yaml:"address"` Codec string `json:"codec" yaml:"codec"` }
SocketConfig contains configuration fields for the Socket output type.
func NewSocketConfig ¶ added in v3.9.0
func NewSocketConfig() SocketConfig
NewSocketConfig creates a new SocketConfig with default values.
type TCP ¶
type TCP struct {
// contains filtered or unexported fields
}
TCP is an output type that sends messages as a continuous steam of line delimied messages over TCP.
func NewTCP ¶
func NewTCP( conf TCPConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*TCP, error)
NewTCP creates a new TCP writer type.
func (*TCP) CloseAsync ¶
func (t *TCP) CloseAsync()
CloseAsync shuts down the TCP output and stops processing messages.
func (*TCP) WaitForClose ¶
WaitForClose blocks until the TCP output has closed down.
type TCPConfig ¶
type TCPConfig struct {
Address string `json:"address" yaml:"address"`
}
TCPConfig contains configuration fields for the TCP output type.
func NewTCPConfig ¶
func NewTCPConfig() TCPConfig
NewTCPConfig creates a new TCPConfig with default values.
type Type ¶
type Type interface { // Connect attempts to establish a connection to the sink, if unsuccessful // returns an error. If the attempt is successful (or not necessary) returns // nil. Connect() error // Write should block until either the message is sent (and acknowledged) to // a sink, or a transport specific error has occurred, or the Type is // closed. Write(msg types.Message) error types.Closable }
Type is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.
type UDP ¶
type UDP struct {
// contains filtered or unexported fields
}
UDP is an output type that sends messages as a continuous steam of line delimied messages over UDP.
func NewUDP ¶
func NewUDP( conf UDPConfig, mgr types.Manager, log log.Modular, stats metrics.Type, ) (*UDP, error)
NewUDP creates a new UDP writer type.
func (*UDP) CloseAsync ¶
func (t *UDP) CloseAsync()
CloseAsync shuts down the UDP output and stops processing messages.
func (*UDP) WaitForClose ¶
WaitForClose blocks until the UDP output has closed down.
type UDPConfig ¶
type UDPConfig struct {
Address string `json:"address" yaml:"address"`
}
UDPConfig contains configuration fields for the UDP output type.
func NewUDPConfig ¶
func NewUDPConfig() UDPConfig
NewUDPConfig creates a new UDPConfig with default values.
type Websocket ¶
type Websocket struct {
// contains filtered or unexported fields
}
Websocket is an output type that serves Websocket messages.
func NewWebsocket ¶
NewWebsocket creates a new Websocket output type.
func (*Websocket) CloseAsync ¶
func (w *Websocket) CloseAsync()
CloseAsync shuts down the Websocket output and stops processing messages.
func (*Websocket) WaitForClose ¶
WaitForClose blocks until the Websocket output has closed down.
type WebsocketConfig ¶
type WebsocketConfig struct { URL string `json:"url" yaml:"url"` auth.Config `json:",inline" yaml:",inline"` TLS btls.Config `json:"tls" yaml:"tls"` }
WebsocketConfig contains configuration fields for the Websocket output type.
func NewWebsocketConfig ¶
func NewWebsocketConfig() WebsocketConfig
NewWebsocketConfig creates a new WebsocketConfig with default values.
type ZMQ4Config ¶
type ZMQ4Config struct { URLs []string `json:"urls" yaml:"urls"` Bind bool `json:"bind" yaml:"bind"` SocketType string `json:"socket_type" yaml:"socket_type"` HighWaterMark int `json:"high_water_mark" yaml:"high_water_mark"` PollTimeout string `json:"poll_timeout" yaml:"poll_timeout"` }
ZMQ4Config contains configuration fields for the ZMQ4 output type.
func NewZMQ4Config ¶
func NewZMQ4Config() *ZMQ4Config
NewZMQ4Config creates a new ZMQ4Config with default values.
Source Files ¶
- amqp.go
- amqp_1.go
- azure_blob_storage.go
- azure_blob_storage_config.go
- azure_queue_storage.go
- azure_queue_storage_config.go
- azure_table_storage.go
- azure_table_storage_config.go
- batched_send.go
- cache.go
- drop.go
- dynamodb.go
- elasticsearch.go
- files.go
- gcp_pubsub.go
- hdfs.go
- http_client.go
- interface.go
- kafka.go
- kinesis.go
- kinesis_firehose.go
- mqtt.go
- nanomsg.go
- nats.go
- nats_stream.go
- nsq.go
- package.go
- redis_hash.go
- redis_list.go
- redis_pubsub.go
- redis_streams.go
- s3.go
- sns.go
- socket.go
- sqs.go
- tcp.go
- udp.go
- websocket.go
- zmq4_config.go