writer

package
v4.0.0-rc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2022 License: MIT Imports: 64 Imported by: 0

Documentation

Overview

Package writer defines implementations of an interface for generic message writing that outputs to various third party sinks.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IterateBatchedSend

func IterateBatchedSend(msg *message.Batch, fn func(int, *message.Part) error) error

IterateBatchedSend executes a closure fn on each message of a batch, where the closure is expected to attempt a send and return an error. If an error is returned then it is added to a batch error in order to support index specific error handling.

However, if a fatal error is returned such as a connection loss or shut down then it is returned immediately.

Types

type AzureBlobStorage

type AzureBlobStorage struct {
	// contains filtered or unexported fields
}

AzureBlobStorage is a benthos writer. Type implementation that writes messages to an Azure Blob Storage storage account.

func NewAzureBlobStorageV2

func NewAzureBlobStorageV2(
	mgr interop.Manager,
	conf AzureBlobStorageConfig,
	log log.Modular,
	stats metrics.Type,
) (*AzureBlobStorage, error)

NewAzureBlobStorageV2 creates a new AzureBlobStorage writer.Type.

func (*AzureBlobStorage) CloseAsync

func (a *AzureBlobStorage) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*AzureBlobStorage) Connect

func (a *AzureBlobStorage) Connect() error

Connect attempts to establish a connection to the target Blob Storage Account.

func (*AzureBlobStorage) ConnectWithContext

func (a *AzureBlobStorage) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target Blob Storage Account.

func (*AzureBlobStorage) WaitForClose

func (a *AzureBlobStorage) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*AzureBlobStorage) Write

func (a *AzureBlobStorage) Write(msg *message.Batch) error

Write attempts to write message contents to a target Azure Blob Storage container as files.

func (*AzureBlobStorage) WriteWithContext

func (a *AzureBlobStorage) WriteWithContext(_ context.Context, msg *message.Batch) error

WriteWithContext attempts to write message contents to a target storage account as files.

type AzureBlobStorageConfig

type AzureBlobStorageConfig struct {
	StorageAccount          string `json:"storage_account" yaml:"storage_account"`
	StorageAccessKey        string `json:"storage_access_key" yaml:"storage_access_key"`
	StorageSASToken         string `json:"storage_sas_token" yaml:"storage_sas_token"`
	StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"`
	Container               string `json:"container" yaml:"container"`
	Path                    string `json:"path" yaml:"path"`
	BlobType                string `json:"blob_type" yaml:"blob_type"`
	PublicAccessLevel       string `json:"public_access_level" yaml:"public_access_level"`
	MaxInFlight             int    `json:"max_in_flight" yaml:"max_in_flight"`
}

AzureBlobStorageConfig contains configuration fields for the AzureBlobStorage output type.

func NewAzureBlobStorageConfig

func NewAzureBlobStorageConfig() AzureBlobStorageConfig

NewAzureBlobStorageConfig creates a new Config with default values.

type AzureQueueStorage

type AzureQueueStorage struct {
	// contains filtered or unexported fields
}

AzureQueueStorage is a benthos writer.Type implementation that writes messages to an Azure Queue Storage queue.

func NewAzureQueueStorageV2

func NewAzureQueueStorageV2(
	conf AzureQueueStorageConfig,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (*AzureQueueStorage, error)

NewAzureQueueStorageV2 creates a new Azure Queue Storage writer type.

func (*AzureQueueStorage) CloseAsync

func (a *AzureQueueStorage) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*AzureQueueStorage) Connect

func (a *AzureQueueStorage) Connect() error

Connect attempts to establish a connection to the target

func (*AzureQueueStorage) ConnectWithContext

func (a *AzureQueueStorage) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target queue.

func (*AzureQueueStorage) WaitForClose

func (a *AzureQueueStorage) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*AzureQueueStorage) Write

func (a *AzureQueueStorage) Write(msg *message.Batch) error

Write attempts to write message contents to a target Azure Queue Storage queue.

func (*AzureQueueStorage) WriteWithContext

func (a *AzureQueueStorage) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write message contents to a target Queue Storage

type AzureQueueStorageConfig

type AzureQueueStorageConfig struct {
	StorageAccount          string        `json:"storage_account" yaml:"storage_account"`
	StorageAccessKey        string        `json:"storage_access_key" yaml:"storage_access_key"`
	StorageConnectionString string        `json:"storage_connection_string" yaml:"storage_connection_string"`
	QueueName               string        `json:"queue_name" yaml:"queue_name"`
	TTL                     string        `json:"ttl" yaml:"ttl"`
	MaxInFlight             int           `json:"max_in_flight" yaml:"max_in_flight"`
	Batching                policy.Config `json:"batching" yaml:"batching"`
}

AzureQueueStorageConfig contains configuration fields for the output Azure Queue Storage type.

func NewAzureQueueStorageConfig

func NewAzureQueueStorageConfig() AzureQueueStorageConfig

NewAzureQueueStorageConfig creates a new Config with default values.

type AzureTableStorage

type AzureTableStorage struct {
	// contains filtered or unexported fields
}

AzureTableStorage is a benthos writer. Type implementation that writes messages to an Azure Table Storage table.

func NewAzureTableStorageV2

func NewAzureTableStorageV2(
	conf AzureTableStorageConfig,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (*AzureTableStorage, error)

NewAzureTableStorageV2 creates a new Azure Table Storage writer Type.

func (*AzureTableStorage) CloseAsync

func (a *AzureTableStorage) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*AzureTableStorage) Connect

func (a *AzureTableStorage) Connect() error

Connect attempts to establish a connection to the target Table Storage Account.

func (*AzureTableStorage) ConnectWithContext

func (a *AzureTableStorage) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target Table Storage Account.

func (*AzureTableStorage) WaitForClose

func (a *AzureTableStorage) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*AzureTableStorage) Write

func (a *AzureTableStorage) Write(msg *message.Batch) error

Write attempts to write message contents to a target Azure Table Storage container as files.

func (*AzureTableStorage) WriteWithContext

func (a *AzureTableStorage) WriteWithContext(wctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write message contents to a target storage account as files.

type AzureTableStorageConfig

type AzureTableStorageConfig struct {
	StorageAccount          string            `json:"storage_account" yaml:"storage_account"`
	StorageAccessKey        string            `json:"storage_access_key" yaml:"storage_access_key"`
	StorageConnectionString string            `json:"storage_connection_string" yaml:"storage_connection_string"`
	TableName               string            `json:"table_name" yaml:"table_name"`
	PartitionKey            string            `json:"partition_key" yaml:"partition_key"`
	RowKey                  string            `json:"row_key" yaml:"row_key"`
	Properties              map[string]string `json:"properties" yaml:"properties"`
	InsertType              string            `json:"insert_type" yaml:"insert_type"`
	Timeout                 string            `json:"timeout" yaml:"timeout"`
	MaxInFlight             int               `json:"max_in_flight" yaml:"max_in_flight"`
	Batching                policy.Config     `json:"batching" yaml:"batching"`
}

AzureTableStorageConfig contains configuration fields for the AzureTableStorage output type.

func NewAzureTableStorageConfig

func NewAzureTableStorageConfig() AzureTableStorageConfig

NewAzureTableStorageConfig creates a new Config with default values.

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

Cache is a benthos writer.Type implementation that writes messages to a Cache directory.

func NewCache

func NewCache(
	conf CacheConfig,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (*Cache, error)

NewCache creates a new Cache writer.Type.

func (*Cache) CloseAsync

func (c *Cache) CloseAsync()

CloseAsync begins cleaning up resources used by this writer asynchronously.

func (*Cache) Connect

func (c *Cache) Connect() error

Connect does nothing.

func (*Cache) ConnectWithContext

func (c *Cache) ConnectWithContext(ctx context.Context) error

ConnectWithContext does nothing.

func (*Cache) WaitForClose

func (c *Cache) WaitForClose(time.Duration) error

WaitForClose will block until either the writer is closed or a specified timeout occurs.

func (*Cache) Write

func (c *Cache) Write(msg *message.Batch) error

Write attempts to write message contents to a target Cache.

func (*Cache) WriteWithContext

func (c *Cache) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write message contents to a target Cache.

type CacheConfig

type CacheConfig struct {
	Target      string `json:"target" yaml:"target"`
	Key         string `json:"key" yaml:"key"`
	TTL         string `json:"ttl" yaml:"ttl"`
	MaxInFlight int    `json:"max_in_flight" yaml:"max_in_flight"`
}

CacheConfig contains configuration fields for the Cache output type.

func NewCacheConfig

func NewCacheConfig() CacheConfig

NewCacheConfig creates a new Config with default values.

type Drop

type Drop struct {
	// contains filtered or unexported fields
}

Drop is a benthos writer.Type implementation that writes message parts to no where.

func NewDrop

func NewDrop(
	conf DropConfig,
	log log.Modular,
	stats metrics.Type,
) *Drop

NewDrop creates a new file based writer.Type.

func (*Drop) CloseAsync

func (d *Drop) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*Drop) ConnectWithContext

func (d *Drop) ConnectWithContext(ctx context.Context) error

ConnectWithContext is a noop.

func (*Drop) WaitForClose

func (d *Drop) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*Drop) WriteWithContext

func (d *Drop) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext does nothing.

type DropConfig

type DropConfig struct{}

DropConfig contains configuration fields for the drop output type.

func NewDropConfig

func NewDropConfig() DropConfig

NewDropConfig creates a new DropConfig with default values.

type Elasticsearch

type Elasticsearch struct {
	// contains filtered or unexported fields
}

Elasticsearch is a writer type that writes messages into elasticsearch.

func NewElasticsearchV2

func NewElasticsearchV2(conf ElasticsearchConfig, mgr interop.Manager, log log.Modular, stats metrics.Type) (*Elasticsearch, error)

NewElasticsearchV2 creates a new Elasticsearch writer type.

func (*Elasticsearch) CloseAsync

func (e *Elasticsearch) CloseAsync()

CloseAsync shuts down the Elasticsearch writer and stops processing messages.

func (*Elasticsearch) Connect

func (e *Elasticsearch) Connect() error

Connect attempts to establish a connection to a Elasticsearch broker.

func (*Elasticsearch) ConnectWithContext

func (e *Elasticsearch) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to a Elasticsearch broker.

func (*Elasticsearch) WaitForClose

func (e *Elasticsearch) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Elasticsearch writer has closed down.

func (*Elasticsearch) Write

func (e *Elasticsearch) Write(msg *message.Batch) error

Write will attempt to write a message to Elasticsearch, wait for acknowledgement, and returns an error if applicable.

func (*Elasticsearch) WriteWithContext

func (e *Elasticsearch) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext will attempt to write a message to Elasticsearch, wait for acknowledgement, and returns an error if applicable.

type ElasticsearchConfig

type ElasticsearchConfig struct {
	URLs            []string             `json:"urls" yaml:"urls"`
	Sniff           bool                 `json:"sniff" yaml:"sniff"`
	Healthcheck     bool                 `json:"healthcheck" yaml:"healthcheck"`
	ID              string               `json:"id" yaml:"id"`
	Action          string               `json:"action" yaml:"action"`
	Index           string               `json:"index" yaml:"index"`
	Pipeline        string               `json:"pipeline" yaml:"pipeline"`
	Routing         string               `json:"routing" yaml:"routing"`
	Type            string               `json:"type" yaml:"type"`
	Timeout         string               `json:"timeout" yaml:"timeout"`
	TLS             btls.Config          `json:"tls" yaml:"tls"`
	Auth            auth.BasicAuthConfig `json:"basic_auth" yaml:"basic_auth"`
	AWS             OptionalAWSConfig    `json:"aws" yaml:"aws"`
	GzipCompression bool                 `json:"gzip_compression" yaml:"gzip_compression"`
	MaxInFlight     int                  `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config  `json:",inline" yaml:",inline"`
	Batching        policy.Config `json:"batching" yaml:"batching"`
}

ElasticsearchConfig contains configuration fields for the Elasticsearch output type.

func NewElasticsearchConfig

func NewElasticsearchConfig() ElasticsearchConfig

NewElasticsearchConfig creates a new ElasticsearchConfig with default values.

type Files

type Files struct {
	// contains filtered or unexported fields
}

Files is a benthos writer.Type implementation that writes message parts each to their own file.

func NewFilesV2

func NewFilesV2(
	conf FilesConfig,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (*Files, error)

NewFilesV2 creates a new file based writer.Type.

func (*Files) CloseAsync

func (f *Files) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*Files) Connect

func (f *Files) Connect() error

Connect is a noop.

func (*Files) ConnectWithContext

func (f *Files) ConnectWithContext(ctx context.Context) error

ConnectWithContext is a noop.

func (*Files) WaitForClose

func (f *Files) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*Files) Write

func (f *Files) Write(msg *message.Batch) error

Write attempts to write message contents to a directory as files.

func (*Files) WriteWithContext

func (f *Files) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write message contents to a directory as files.

type FilesConfig

type FilesConfig struct {
	Path string `json:"path" yaml:"path"`
}

FilesConfig contains configuration fields for the files output type.

func NewFilesConfig

func NewFilesConfig() FilesConfig

NewFilesConfig creates a new Config with default values.

type GCPPubSub

type GCPPubSub struct {
	// contains filtered or unexported fields
}

GCPPubSub is a benthos writer.Type implementation that writes messages to a GCP Pub/Sub topic.

func NewGCPPubSubV2

func NewGCPPubSubV2(
	conf GCPPubSubConfig,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (*GCPPubSub, error)

NewGCPPubSubV2 creates a new GCP Cloud Pub/Sub writer.Type.

func (*GCPPubSub) CloseAsync

func (c *GCPPubSub) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*GCPPubSub) Connect

func (c *GCPPubSub) Connect() error

Connect attempts to establish a connection to the target GCP Pub/Sub topic.

func (*GCPPubSub) ConnectWithContext

func (c *GCPPubSub) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target GCP Pub/Sub topic.

func (*GCPPubSub) WaitForClose

func (c *GCPPubSub) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*GCPPubSub) Write

func (c *GCPPubSub) Write(msg *message.Batch) error

Write attempts to write message contents to a target topic.

func (*GCPPubSub) WriteWithContext

func (c *GCPPubSub) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write message contents to a target topic.

type GCPPubSubConfig

type GCPPubSubConfig struct {
	ProjectID      string                       `json:"project" yaml:"project"`
	TopicID        string                       `json:"topic" yaml:"topic"`
	MaxInFlight    int                          `json:"max_in_flight" yaml:"max_in_flight"`
	PublishTimeout string                       `json:"publish_timeout" yaml:"publish_timeout"`
	Metadata       metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	OrderingKey    string                       `json:"ordering_key" yaml:"ordering_key"`
}

GCPPubSubConfig contains configuration fields for the output GCPPubSub type.

func NewGCPPubSubConfig

func NewGCPPubSubConfig() GCPPubSubConfig

NewGCPPubSubConfig creates a new Config with default values.

type HDFS

type HDFS struct {
	// contains filtered or unexported fields
}

HDFS is a benthos writer.Type implementation that writes messages to a HDFS directory.

func NewHDFSV2

func NewHDFSV2(
	conf HDFSConfig,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (*HDFS, error)

NewHDFSV2 creates a new HDFS writer.Type.

func (*HDFS) CloseAsync

func (h *HDFS) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*HDFS) Connect

func (h *HDFS) Connect() error

Connect attempts to establish a connection to the target HDFS host.

func (*HDFS) ConnectWithContext

func (h *HDFS) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target HDFS host.

func (*HDFS) WaitForClose

func (h *HDFS) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*HDFS) Write

func (h *HDFS) Write(msg *message.Batch) error

Write attempts to write message contents to a target HDFS directory as files.

func (*HDFS) WriteWithContext

func (h *HDFS) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write message contents to a target HDFS directory as files.

type HDFSConfig

type HDFSConfig struct {
	Hosts       []string      `json:"hosts" yaml:"hosts"`
	User        string        `json:"user" yaml:"user"`
	Directory   string        `json:"directory" yaml:"directory"`
	Path        string        `json:"path" yaml:"path"`
	MaxInFlight int           `json:"max_in_flight" yaml:"max_in_flight"`
	Batching    policy.Config `json:"batching" yaml:"batching"`
}

HDFSConfig contains configuration fields for the HDFS output type.

func NewHDFSConfig

func NewHDFSConfig() HDFSConfig

NewHDFSConfig creates a new Config with default values.

type HTTPClient

type HTTPClient struct {
	// contains filtered or unexported fields
}

HTTPClient is an output type that sends messages as HTTP requests to a target server endpoint.

func NewHTTPClient

func NewHTTPClient(
	conf HTTPClientConfig,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (*HTTPClient, error)

NewHTTPClient creates a new HTTPClient writer type.

func (*HTTPClient) CloseAsync

func (h *HTTPClient) CloseAsync()

CloseAsync shuts down the HTTPClient output and stops processing messages.

func (*HTTPClient) Connect

func (h *HTTPClient) Connect() error

Connect does nothing.

func (*HTTPClient) ConnectWithContext

func (h *HTTPClient) ConnectWithContext(ctx context.Context) error

ConnectWithContext does nothing.

func (*HTTPClient) WaitForClose

func (h *HTTPClient) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the HTTPClient output has closed down.

func (*HTTPClient) Write

func (h *HTTPClient) Write(msg *message.Batch) error

Write attempts to send a message to an HTTP server, this attempt may include retries, and if all retries fail an error is returned.

func (*HTTPClient) WriteWithContext

func (h *HTTPClient) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to send a message to an HTTP server, this attempt may include retries, and if all retries fail an error is returned.

type HTTPClientConfig

type HTTPClientConfig struct {
	docs.Config       `json:",inline" yaml:",inline"`
	BatchAsMultipart  bool                            `json:"batch_as_multipart" yaml:"batch_as_multipart"`
	MaxInFlight       int                             `json:"max_in_flight" yaml:"max_in_flight"`
	PropagateResponse bool                            `json:"propagate_response" yaml:"propagate_response"`
	Batching          policy.Config                   `json:"batching" yaml:"batching"`
	Multipart         []HTTPClientMultipartExpression `json:"multipart" yaml:"multipart"`
}

HTTPClientConfig contains configuration fields for the HTTPClient output type.

func NewHTTPClientConfig

func NewHTTPClientConfig() HTTPClientConfig

NewHTTPClientConfig creates a new HTTPClientConfig with default values.

type HTTPClientMultipartExpression

type HTTPClientMultipartExpression struct {
	ContentDisposition string `json:"content_disposition" yaml:"content_disposition"`
	ContentType        string `json:"content_type" yaml:"content_type"`
	Body               string `json:"body" yaml:"body"`
}

HTTPClientMultipartExpression represents dynamic expressions that define a multipart message part in an HTTP request. Specifying one or more of these can be used as a way of creating HTTP requests that overrides the default behaviour.

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka is a writer type that writes messages into kafka.

func NewKafka

func NewKafka(conf KafkaConfig, mgr interop.Manager, log log.Modular, stats metrics.Type) (*Kafka, error)

NewKafka creates a new Kafka writer type.

func (*Kafka) CloseAsync

func (k *Kafka) CloseAsync()

CloseAsync shuts down the Kafka writer and stops processing messages.

func (*Kafka) Connect

func (k *Kafka) Connect() error

Connect attempts to establish a connection to a Kafka broker.

func (*Kafka) ConnectWithContext

func (k *Kafka) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to a Kafka broker.

func (*Kafka) WaitForClose

func (k *Kafka) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Kafka writer has closed down.

func (*Kafka) Write

func (k *Kafka) Write(msg *message.Batch) error

Write will attempt to write a message to Kafka, wait for acknowledgement, and returns an error if applicable.

func (*Kafka) WriteWithContext

func (k *Kafka) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext will attempt to write a message to Kafka, wait for acknowledgement, and returns an error if applicable.

type KafkaConfig

type KafkaConfig struct {
	Addresses        []string    `json:"addresses" yaml:"addresses"`
	ClientID         string      `json:"client_id" yaml:"client_id"`
	RackID           string      `json:"rack_id" yaml:"rack_id"`
	Key              string      `json:"key" yaml:"key"`
	Partitioner      string      `json:"partitioner" yaml:"partitioner"`
	Partition        string      `json:"partition" yaml:"partition"`
	Topic            string      `json:"topic" yaml:"topic"`
	Compression      string      `json:"compression" yaml:"compression"`
	MaxMsgBytes      int         `json:"max_msg_bytes" yaml:"max_msg_bytes"`
	Timeout          string      `json:"timeout" yaml:"timeout"`
	AckReplicas      bool        `json:"ack_replicas" yaml:"ack_replicas"`
	TargetVersion    string      `json:"target_version" yaml:"target_version"`
	TLS              btls.Config `json:"tls" yaml:"tls"`
	SASL             sasl.Config `json:"sasl" yaml:"sasl"`
	MaxInFlight      int         `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config   `json:",inline" yaml:",inline"`
	RetryAsBatch     bool                         `json:"retry_as_batch" yaml:"retry_as_batch"`
	Batching         policy.Config                `json:"batching" yaml:"batching"`
	StaticHeaders    map[string]string            `json:"static_headers" yaml:"static_headers"`
	Metadata         metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	InjectTracingMap string                       `json:"inject_tracing_map" yaml:"inject_tracing_map"`
}

KafkaConfig contains configuration fields for the Kafka output type.

func NewKafkaConfig

func NewKafkaConfig() KafkaConfig

NewKafkaConfig creates a new KafkaConfig with default values.

type MQTT

type MQTT struct {
	// contains filtered or unexported fields
}

MQTT is an output type that serves MQTT messages.

func NewMQTTV2

func NewMQTTV2(
	conf MQTTConfig,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (*MQTT, error)

NewMQTTV2 creates a new MQTT output type.

func (*MQTT) CloseAsync

func (m *MQTT) CloseAsync()

CloseAsync shuts down the MQTT output and stops processing messages.

func (*MQTT) Connect

func (m *MQTT) Connect() error

Connect establishes a connection to an MQTT server.

func (*MQTT) ConnectWithContext

func (m *MQTT) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an MQTT server.

func (*MQTT) WaitForClose

func (m *MQTT) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the MQTT output has closed down.

func (*MQTT) Write

func (m *MQTT) Write(msg *message.Batch) error

Write attempts to write a message by pushing it to an MQTT broker.

func (*MQTT) WriteWithContext

func (m *MQTT) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write a message by pushing it to an MQTT broker.

type MQTTConfig

type MQTTConfig struct {
	URLs                  []string      `json:"urls" yaml:"urls"`
	QoS                   uint8         `json:"qos" yaml:"qos"`
	Retained              bool          `json:"retained" yaml:"retained"`
	RetainedInterpolated  string        `json:"retained_interpolated" yaml:"retained_interpolated"`
	Topic                 string        `json:"topic" yaml:"topic"`
	ClientID              string        `json:"client_id" yaml:"client_id"`
	DynamicClientIDSuffix string        `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"`
	Will                  mqttconf.Will `json:"will" yaml:"will"`
	User                  string        `json:"user" yaml:"user"`
	Password              string        `json:"password" yaml:"password"`
	ConnectTimeout        string        `json:"connect_timeout" yaml:"connect_timeout"`
	WriteTimeout          string        `json:"write_timeout" yaml:"write_timeout"`
	KeepAlive             int64         `json:"keepalive" yaml:"keepalive"`
	MaxInFlight           int           `json:"max_in_flight" yaml:"max_in_flight"`
	TLS                   tls.Config    `json:"tls" yaml:"tls"`
}

MQTTConfig contains configuration fields for the MQTT output type.

func NewMQTTConfig

func NewMQTTConfig() MQTTConfig

NewMQTTConfig creates a new MQTTConfig with default values.

type NATS

type NATS struct {
	// contains filtered or unexported fields
}

NATS is an output type that serves NATS messages.

func NewNATSV2

func NewNATSV2(conf NATSConfig, mgr interop.Manager, log log.Modular, stats metrics.Type) (*NATS, error)

NewNATSV2 creates a new NATS output type.

func (*NATS) CloseAsync

func (n *NATS) CloseAsync()

CloseAsync shuts down the MQTT output and stops processing messages.

func (*NATS) Connect

func (n *NATS) Connect() error

Connect attempts to establish a connection to NATS servers.

func (*NATS) ConnectWithContext

func (n *NATS) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to NATS servers.

func (*NATS) WaitForClose

func (n *NATS) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the NATS output has closed down.

func (*NATS) Write

func (n *NATS) Write(msg *message.Batch) error

Write attempts to write a message.

func (*NATS) WriteWithContext

func (n *NATS) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write a message.

type NATSConfig

type NATSConfig struct {
	URLs        []string          `json:"urls" yaml:"urls"`
	Subject     string            `json:"subject" yaml:"subject"`
	Headers     map[string]string `json:"headers" yaml:"headers"`
	MaxInFlight int               `json:"max_in_flight" yaml:"max_in_flight"`
	TLS         btls.Config       `json:"tls" yaml:"tls"`
	Auth        auth.Config       `json:"auth" yaml:"auth"`
}

NATSConfig contains configuration fields for the NATS output type.

func NewNATSConfig

func NewNATSConfig() NATSConfig

NewNATSConfig creates a new NATSConfig with default values.

type NATSStream

type NATSStream struct {
	// contains filtered or unexported fields
}

NATSStream is an output type that serves NATS messages.

func NewNATSStream

func NewNATSStream(conf NATSStreamConfig, log log.Modular, stats metrics.Type) (*NATSStream, error)

NewNATSStream creates a new NATS Stream output type.

func (*NATSStream) CloseAsync

func (n *NATSStream) CloseAsync()

CloseAsync shuts down the MQTT output and stops processing messages.

func (*NATSStream) Connect

func (n *NATSStream) Connect() error

Connect attempts to establish a connection to NATS servers.

func (*NATSStream) ConnectWithContext

func (n *NATSStream) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to NATS servers.

func (*NATSStream) WaitForClose

func (n *NATSStream) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the NATS output has closed down.

func (*NATSStream) Write

func (n *NATSStream) Write(msg *message.Batch) error

Write attempts to write a message.

func (*NATSStream) WriteWithContext

func (n *NATSStream) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write a message.

type NATSStreamConfig

type NATSStreamConfig struct {
	URLs        []string    `json:"urls" yaml:"urls"`
	ClusterID   string      `json:"cluster_id" yaml:"cluster_id"`
	ClientID    string      `json:"client_id" yaml:"client_id"`
	Subject     string      `json:"subject" yaml:"subject"`
	MaxInFlight int         `json:"max_in_flight" yaml:"max_in_flight"`
	TLS         btls.Config `json:"tls" yaml:"tls"`
	Auth        auth.Config `json:"auth" yaml:"auth"`
}

NATSStreamConfig contains configuration fields for the NATSStream output type.

func NewNATSStreamConfig

func NewNATSStreamConfig() NATSStreamConfig

NewNATSStreamConfig creates a new NATSStreamConfig with default values.

type NSQ

type NSQ struct {
	// contains filtered or unexported fields
}

NSQ is an output type that serves NSQ messages.

func NewNSQV2

func NewNSQV2(conf NSQConfig, mgr interop.Manager, log log.Modular, stats metrics.Type) (*NSQ, error)

NewNSQV2 creates a new NSQ output type.

func (*NSQ) CloseAsync

func (n *NSQ) CloseAsync()

CloseAsync shuts down the NSQ output and stops processing messages.

func (*NSQ) Connect

func (n *NSQ) Connect() error

Connect attempts to establish a connection to NSQ servers.

func (*NSQ) ConnectWithContext

func (n *NSQ) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to NSQ servers.

func (*NSQ) WaitForClose

func (n *NSQ) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the NSQ output has closed down.

func (*NSQ) Write

func (n *NSQ) Write(msg *message.Batch) error

Write attempts to write a message.

func (*NSQ) WriteWithContext

func (n *NSQ) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write a message.

type NSQConfig

type NSQConfig struct {
	Address     string      `json:"nsqd_tcp_address" yaml:"nsqd_tcp_address"`
	Topic       string      `json:"topic" yaml:"topic"`
	UserAgent   string      `json:"user_agent" yaml:"user_agent"`
	TLS         btls.Config `json:"tls" yaml:"tls"`
	MaxInFlight int         `json:"max_in_flight" yaml:"max_in_flight"`
}

NSQConfig contains configuration fields for the NSQ output type.

func NewNSQConfig

func NewNSQConfig() NSQConfig

NewNSQConfig creates a new NSQConfig with default values.

type Nanomsg

type Nanomsg struct {
	// contains filtered or unexported fields
}

Nanomsg is an output type that serves Nanomsg messages.

func NewNanomsg

func NewNanomsg(conf NanomsgConfig, log log.Modular, stats metrics.Type) (*Nanomsg, error)

NewNanomsg creates a new Nanomsg output type.

func (*Nanomsg) CloseAsync

func (s *Nanomsg) CloseAsync()

CloseAsync shuts down the Nanomsg output and stops processing messages.

func (*Nanomsg) Connect

func (s *Nanomsg) Connect() error

Connect establishes a connection to a nanomsg socket.

func (*Nanomsg) ConnectWithContext

func (s *Nanomsg) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to a nanomsg socket.

func (*Nanomsg) WaitForClose

func (s *Nanomsg) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Nanomsg output has closed down.

func (*Nanomsg) Write

func (s *Nanomsg) Write(msg *message.Batch) error

Write attempts to write a message by pushing it to a nanomsg socket.

func (*Nanomsg) WriteWithContext

func (s *Nanomsg) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write a message by pushing it to a nanomsg socket.

type NanomsgConfig

type NanomsgConfig struct {
	URLs        []string `json:"urls" yaml:"urls"`
	Bind        bool     `json:"bind" yaml:"bind"`
	SocketType  string   `json:"socket_type" yaml:"socket_type"`
	PollTimeout string   `json:"poll_timeout" yaml:"poll_timeout"`
	MaxInFlight int      `json:"max_in_flight" yaml:"max_in_flight"`
}

NanomsgConfig contains configuration fields for the Nanomsg output type.

func NewNanomsgConfig

func NewNanomsgConfig() NanomsgConfig

NewNanomsgConfig creates a new NanomsgConfig with default values.

type OptionalAWSConfig

type OptionalAWSConfig struct {
	Enabled     bool `json:"enabled" yaml:"enabled"`
	sess.Config `json:",inline" yaml:",inline"`
}

OptionalAWSConfig contains config fields for AWS authentication with an enable flag.

type RedisHash

type RedisHash struct {
	// contains filtered or unexported fields
}

RedisHash is an output type that writes hash objects to Redis using the HMSET command.

func NewRedisHashV2

func NewRedisHashV2(
	conf RedisHashConfig,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (*RedisHash, error)

NewRedisHashV2 creates a new RedisHash output type.

func (*RedisHash) CloseAsync

func (r *RedisHash) CloseAsync()

CloseAsync shuts down the RedisHash output and stops processing messages.

func (*RedisHash) Connect

func (r *RedisHash) Connect() error

Connect establishes a connection to an RedisHash server.

func (*RedisHash) ConnectWithContext

func (r *RedisHash) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an RedisHash server.

func (*RedisHash) WaitForClose

func (r *RedisHash) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the RedisHash output has closed down.

func (*RedisHash) Write

func (r *RedisHash) Write(msg *message.Batch) error

Write attempts to write a message to Redis by setting it using the HMSET command.

func (*RedisHash) WriteWithContext

func (r *RedisHash) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write a message to Redis by setting it using the HMSET command.

type RedisHashConfig

type RedisHashConfig struct {
	bredis.Config  `json:",inline" yaml:",inline"`
	Key            string            `json:"key" yaml:"key"`
	WalkMetadata   bool              `json:"walk_metadata" yaml:"walk_metadata"`
	WalkJSONObject bool              `json:"walk_json_object" yaml:"walk_json_object"`
	Fields         map[string]string `json:"fields" yaml:"fields"`
	MaxInFlight    int               `json:"max_in_flight" yaml:"max_in_flight"`
}

RedisHashConfig contains configuration fields for the RedisHash output type.

func NewRedisHashConfig

func NewRedisHashConfig() RedisHashConfig

NewRedisHashConfig creates a new RedisHashConfig with default values.

type RedisList

type RedisList struct {
	// contains filtered or unexported fields
}

RedisList is an output type that serves RedisList messages.

func NewRedisListV2

func NewRedisListV2(
	conf RedisListConfig,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (*RedisList, error)

NewRedisListV2 creates a new RedisList output type.

func (*RedisList) CloseAsync

func (r *RedisList) CloseAsync()

CloseAsync shuts down the RedisList output and stops processing messages.

func (*RedisList) Connect

func (r *RedisList) Connect() error

Connect establishes a connection to an RedisList server.

func (*RedisList) ConnectWithContext

func (r *RedisList) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an RedisList server.

func (*RedisList) WaitForClose

func (r *RedisList) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the RedisList output has closed down.

func (*RedisList) Write

func (r *RedisList) Write(msg *message.Batch) error

Write attempts to write a message by pushing it to the end of a Redis list.

func (*RedisList) WriteWithContext

func (r *RedisList) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write a message by pushing it to the end of a Redis list.

type RedisListConfig

type RedisListConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Key           string        `json:"key" yaml:"key"`
	MaxInFlight   int           `json:"max_in_flight" yaml:"max_in_flight"`
	Batching      policy.Config `json:"batching" yaml:"batching"`
}

RedisListConfig contains configuration fields for the RedisList output type.

func NewRedisListConfig

func NewRedisListConfig() RedisListConfig

NewRedisListConfig creates a new RedisListConfig with default values.

type RedisPubSub

type RedisPubSub struct {
	// contains filtered or unexported fields
}

RedisPubSub is an output type that serves RedisPubSub messages.

func NewRedisPubSubV2

func NewRedisPubSubV2(
	conf RedisPubSubConfig,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (*RedisPubSub, error)

NewRedisPubSubV2 creates a new RedisPubSub output type.

func (*RedisPubSub) CloseAsync

func (r *RedisPubSub) CloseAsync()

CloseAsync shuts down the RedisPubSub output and stops processing messages.

func (*RedisPubSub) Connect

func (r *RedisPubSub) Connect() error

Connect establishes a connection to an RedisPubSub server.

func (*RedisPubSub) ConnectWithContext

func (r *RedisPubSub) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an RedisPubSub server.

func (*RedisPubSub) WaitForClose

func (r *RedisPubSub) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the RedisPubSub output has closed down.

func (*RedisPubSub) Write

func (r *RedisPubSub) Write(msg *message.Batch) error

Write attempts to write a message by pushing it to a Redis pub/sub topic.

func (*RedisPubSub) WriteWithContext

func (r *RedisPubSub) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write a message by pushing it to a Redis pub/sub topic.

type RedisPubSubConfig

type RedisPubSubConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Channel       string        `json:"channel" yaml:"channel"`
	MaxInFlight   int           `json:"max_in_flight" yaml:"max_in_flight"`
	Batching      policy.Config `json:"batching" yaml:"batching"`
}

RedisPubSubConfig contains configuration fields for the RedisPubSub output type.

func NewRedisPubSubConfig

func NewRedisPubSubConfig() RedisPubSubConfig

NewRedisPubSubConfig creates a new RedisPubSubConfig with default values.

type RedisStreams

type RedisStreams struct {
	// contains filtered or unexported fields
}

RedisStreams is an output type that serves RedisStreams messages.

func NewRedisStreams

func NewRedisStreams(
	conf RedisStreamsConfig,
	log log.Modular,
	stats metrics.Type,
) (*RedisStreams, error)

NewRedisStreams creates a new RedisStreams output type.

func (*RedisStreams) CloseAsync

func (r *RedisStreams) CloseAsync()

CloseAsync shuts down the RedisStreams output and stops processing messages.

func (*RedisStreams) Connect

func (r *RedisStreams) Connect() error

Connect establishes a connection to an RedisStreams server.

func (*RedisStreams) ConnectWithContext

func (r *RedisStreams) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an RedisStreams server.

func (*RedisStreams) WaitForClose

func (r *RedisStreams) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the RedisStreams output has closed down.

func (*RedisStreams) Write

func (r *RedisStreams) Write(msg *message.Batch) error

Write attempts to write a message by pushing it to a Redis stream.

func (*RedisStreams) WriteWithContext

func (r *RedisStreams) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write a message by pushing it to a Redis stream.

type RedisStreamsConfig

type RedisStreamsConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Stream        string                       `json:"stream" yaml:"stream"`
	BodyKey       string                       `json:"body_key" yaml:"body_key"`
	MaxLenApprox  int64                        `json:"max_length" yaml:"max_length"`
	MaxInFlight   int                          `json:"max_in_flight" yaml:"max_in_flight"`
	Metadata      metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	Batching      policy.Config                `json:"batching" yaml:"batching"`
}

RedisStreamsConfig contains configuration fields for the RedisStreams output type.

func NewRedisStreamsConfig

func NewRedisStreamsConfig() RedisStreamsConfig

NewRedisStreamsConfig creates a new RedisStreamsConfig with default values.

type Socket

type Socket struct {
	// contains filtered or unexported fields
}

Socket is an output type that sends messages as a continuous steam of line delimied messages over socket.

func NewSocket

func NewSocket(
	conf SocketConfig,
	mgr interop.Manager,
	log log.Modular,
	stats metrics.Type,
) (*Socket, error)

NewSocket creates a new Socket writer type.

func (*Socket) CloseAsync

func (s *Socket) CloseAsync()

CloseAsync shuts down the socket output and stops processing messages.

func (*Socket) Connect

func (s *Socket) Connect() error

Connect establises a connection to the target socket server.

func (*Socket) ConnectWithContext

func (s *Socket) ConnectWithContext(ctx context.Context) error

ConnectWithContext establises a connection to the target socket server.

func (*Socket) WaitForClose

func (s *Socket) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the socket output has closed down.

func (*Socket) Write

func (s *Socket) Write(msg *message.Batch) error

Write attempts to write a message.

func (*Socket) WriteWithContext

func (s *Socket) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write a message.

type SocketConfig

type SocketConfig struct {
	Network string `json:"network" yaml:"network"`
	Address string `json:"address" yaml:"address"`
	Codec   string `json:"codec" yaml:"codec"`
}

SocketConfig contains configuration fields for the Socket output type.

func NewSocketConfig

func NewSocketConfig() SocketConfig

NewSocketConfig creates a new SocketConfig with default values.

type Type

type Type interface {
	// Connect attempts to establish a connection to the sink, if unsuccessful
	// returns an error. If the attempt is successful (or not necessary) returns
	// nil.
	Connect() error

	// Write should block until either the message is sent (and acknowledged) to
	// a sink, or a transport specific error has occurred, or the Type is
	// closed.
	Write(msg *message.Batch) error

	// CloseAsync triggers the shut down of this component but should not block
	// the calling goroutine.
	CloseAsync()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(timeout time.Duration) error
}

Type is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.

type Websocket

type Websocket struct {
	// contains filtered or unexported fields
}

Websocket is an output type that serves Websocket messages.

func NewWebsocket

func NewWebsocket(
	conf WebsocketConfig,
	log log.Modular,
	stats metrics.Type,
) (*Websocket, error)

NewWebsocket creates a new Websocket output type.

func (*Websocket) CloseAsync

func (w *Websocket) CloseAsync()

CloseAsync shuts down the Websocket output and stops processing messages.

func (*Websocket) ConnectWithContext

func (w *Websocket) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an Websocket server.

func (*Websocket) WaitForClose

func (w *Websocket) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Websocket output has closed down.

func (*Websocket) WriteWithContext

func (w *Websocket) WriteWithContext(ctx context.Context, msg *message.Batch) error

WriteWithContext attempts to write a message by pushing it to an Websocket broker.

type WebsocketConfig

type WebsocketConfig struct {
	URL         string `json:"url" yaml:"url"`
	auth.Config `json:",inline" yaml:",inline"`
	TLS         btls.Config `json:"tls" yaml:"tls"`
}

WebsocketConfig contains configuration fields for the Websocket output type.

func NewWebsocketConfig

func NewWebsocketConfig() WebsocketConfig

NewWebsocketConfig creates a new WebsocketConfig with default values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL