writer

package
v3.55.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2021 License: MIT Imports: 78 Imported by: 1

Documentation

Overview

Package writer defines implementations of an interface for generic message writing that outputs to various third party sinks.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IterateBatchedSend added in v3.22.0

func IterateBatchedSend(msg types.Message, fn func(int, types.Part) error) error

IterateBatchedSend executes a closure fn on each message of a batch, where the closure is expected to attempt a send and return an error. If an error is returned then it is added to a batch error in order to support index specific error handling.

However, if a fatal error is returned such as a connection loss or shut down then it is returned immediately.

Types

type AMQP

type AMQP struct {
	// contains filtered or unexported fields
}

AMQP is an output type that serves AMQP messages.

func NewAMQP

func NewAMQP(conf AMQPConfig, log log.Modular, stats metrics.Type) (*AMQP, error)

NewAMQP creates a new AMQP writer type.

func (*AMQP) CloseAsync

func (a *AMQP) CloseAsync()

CloseAsync shuts down the AMQP output and stops processing messages.

func (*AMQP) Connect

func (a *AMQP) Connect() error

Connect establishes a connection to an AMQP server.

func (*AMQP) ConnectWithContext added in v3.8.0

func (a *AMQP) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an AMQP server.

func (*AMQP) WaitForClose

func (a *AMQP) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the AMQP output has closed down.

func (*AMQP) Write

func (a *AMQP) Write(msg types.Message) error

Write will attempt to write a message over AMQP, wait for acknowledgement, and returns an error if applicable.

func (*AMQP) WriteWithContext added in v3.8.0

func (a *AMQP) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext will attempt to write a message over AMQP, wait for acknowledgement, and returns an error if applicable.

type AMQP1 added in v3.19.0

type AMQP1 struct {
	// contains filtered or unexported fields
}

AMQP1 is an output type that serves AMQP1 messages.

func NewAMQP1 added in v3.19.0

func NewAMQP1(conf AMQP1Config, log log.Modular, stats metrics.Type) (*AMQP1, error)

NewAMQP1 creates a new AMQP1 writer type.

func (*AMQP1) CloseAsync added in v3.19.0

func (a *AMQP1) CloseAsync()

CloseAsync shuts down the AMQP1 output and stops processing messages.

func (*AMQP1) Connect added in v3.19.0

func (a *AMQP1) Connect() error

Connect establishes a connection to an AMQP1 server.

func (*AMQP1) ConnectWithContext added in v3.19.0

func (a *AMQP1) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an AMQP1 server.

func (*AMQP1) WaitForClose added in v3.19.0

func (a *AMQP1) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the AMQP1 output has closed down.

func (*AMQP1) Write added in v3.19.0

func (a *AMQP1) Write(msg types.Message) error

Write will attempt to write a message over AMQP1, wait for acknowledgement, and returns an error if applicable.

func (*AMQP1) WriteWithContext added in v3.19.0

func (a *AMQP1) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext will attempt to write a message over AMQP1, wait for acknowledgement, and returns an error if applicable.

type AMQP1Config added in v3.19.0

type AMQP1Config struct {
	URL           string          `json:"url" yaml:"url"`
	TargetAddress string          `json:"target_address" yaml:"target_address"`
	MaxInFlight   int             `json:"max_in_flight" yaml:"max_in_flight"`
	TLS           btls.Config     `json:"tls" yaml:"tls"`
	SASL          sasl.Config     `json:"sasl" yaml:"sasl"`
	Metadata      output.Metadata `json:"metadata" yaml:"metadata"`
}

AMQP1Config contains configuration fields for the AMQP1 output type.

func NewAMQP1Config added in v3.19.0

func NewAMQP1Config() AMQP1Config

NewAMQP1Config creates a new AMQP1Config with default values.

type AMQPConfig

type AMQPConfig struct {
	URL             string                    `json:"url" yaml:"url"`
	MaxInFlight     int                       `json:"max_in_flight" yaml:"max_in_flight"`
	Exchange        string                    `json:"exchange" yaml:"exchange"`
	ExchangeDeclare AMQPExchangeDeclareConfig `json:"exchange_declare" yaml:"exchange_declare"`
	BindingKey      string                    `json:"key" yaml:"key"`
	Type            string                    `json:"type" yaml:"type"`
	ContentType     string                    `json:"content_type" yaml:"content_type"`
	ContentEncoding string                    `json:"content_encoding" yaml:"content_encoding"`
	Metadata        output.Metadata           `json:"metadata" yaml:"metadata"`
	Priority        string                    `json:"priority" yaml:"priority"`
	Persistent      bool                      `json:"persistent" yaml:"persistent"`
	Mandatory       bool                      `json:"mandatory" yaml:"mandatory"`
	Immediate       bool                      `json:"immediate" yaml:"immediate"`
	TLS             btls.Config               `json:"tls" yaml:"tls"`
}

AMQPConfig contains configuration fields for the AMQP output type.

func NewAMQPConfig

func NewAMQPConfig() AMQPConfig

NewAMQPConfig creates a new AMQPConfig with default values.

type AMQPExchangeDeclareConfig

type AMQPExchangeDeclareConfig struct {
	Enabled bool   `json:"enabled" yaml:"enabled"`
	Type    string `json:"type" yaml:"type"`
	Durable bool   `json:"durable" yaml:"durable"`
}

AMQPExchangeDeclareConfig contains fields indicating whether the target AMQP exchange needs to be declared, as well as any fields specifying how to accomplish that.

type AmazonS3

type AmazonS3 struct {
	// contains filtered or unexported fields
}

AmazonS3 is a benthos writer.Type implementation that writes messages to an Amazon S3 bucket.

func NewAmazonS3

func NewAmazonS3(
	conf AmazonS3Config,
	log log.Modular,
	stats metrics.Type,
) (*AmazonS3, error)

NewAmazonS3 creates a new Amazon S3 bucket writer.Type.

func (*AmazonS3) CloseAsync

func (a *AmazonS3) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*AmazonS3) Connect

func (a *AmazonS3) Connect() error

Connect attempts to establish a connection to the target S3 bucket.

func (*AmazonS3) ConnectWithContext added in v3.8.0

func (a *AmazonS3) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target S3 bucket.

func (*AmazonS3) WaitForClose

func (a *AmazonS3) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*AmazonS3) Write

func (a *AmazonS3) Write(msg types.Message) error

Write attempts to write message contents to a target S3 bucket as files.

func (*AmazonS3) WriteWithContext added in v3.8.0

func (a *AmazonS3) WriteWithContext(wctx context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a target S3 bucket as files.

type AmazonS3Config

type AmazonS3Config struct {
	sess.Config        `json:",inline" yaml:",inline"`
	Bucket             string             `json:"bucket" yaml:"bucket"`
	ForcePathStyleURLs bool               `json:"force_path_style_urls" yaml:"force_path_style_urls"`
	Path               string             `json:"path" yaml:"path"`
	Tags               map[string]string  `json:"tags" yaml:"tags"`
	ContentType        string             `json:"content_type" yaml:"content_type"`
	ContentEncoding    string             `json:"content_encoding" yaml:"content_encoding"`
	Metadata           output.Metadata    `json:"metadata" yaml:"metadata"`
	StorageClass       string             `json:"storage_class" yaml:"storage_class"`
	Timeout            string             `json:"timeout" yaml:"timeout"`
	KMSKeyID           string             `json:"kms_key_id" yaml:"kms_key_id"`
	MaxInFlight        int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching           batch.PolicyConfig `json:"batching" yaml:"batching"`
}

AmazonS3Config contains configuration fields for the AmazonS3 output type.

func NewAmazonS3Config

func NewAmazonS3Config() AmazonS3Config

NewAmazonS3Config creates a new Config with default values.

type AmazonSQS

type AmazonSQS struct {
	// contains filtered or unexported fields
}

AmazonSQS is a benthos writer.Type implementation that writes messages to an Amazon SQS queue.

func NewAmazonSQS

func NewAmazonSQS(
	conf AmazonSQSConfig,
	log log.Modular,
	stats metrics.Type,
) (*AmazonSQS, error)

NewAmazonSQS creates a new Amazon SQS writer.Type.

func (*AmazonSQS) CloseAsync

func (a *AmazonSQS) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*AmazonSQS) Connect

func (a *AmazonSQS) Connect() error

Connect attempts to establish a connection to the target SQS queue.

func (*AmazonSQS) ConnectWithContext added in v3.8.0

func (a *AmazonSQS) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target SQS queue.

func (*AmazonSQS) WaitForClose

func (a *AmazonSQS) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*AmazonSQS) Write

func (a *AmazonSQS) Write(msg types.Message) error

Write attempts to write message contents to a target SQS.

func (*AmazonSQS) WriteWithContext added in v3.8.0

func (a *AmazonSQS) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a target SQS.

type AmazonSQSConfig

type AmazonSQSConfig struct {
	URL                    string          `json:"url" yaml:"url"`
	MessageGroupID         string          `json:"message_group_id" yaml:"message_group_id"`
	MessageDeduplicationID string          `json:"message_deduplication_id" yaml:"message_deduplication_id"`
	Metadata               output.Metadata `json:"metadata" yaml:"metadata"`
	MaxInFlight            int             `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config         `json:",inline" yaml:",inline"`
	Batching               batch.PolicyConfig `json:"batching" yaml:"batching"`
	// contains filtered or unexported fields
}

AmazonSQSConfig contains configuration fields for the output AmazonSQS type.

func NewAmazonSQSConfig

func NewAmazonSQSConfig() AmazonSQSConfig

NewAmazonSQSConfig creates a new Config with default values.

type AzureBlobStorage added in v3.21.0

type AzureBlobStorage struct {
	// contains filtered or unexported fields
}

AzureBlobStorage is a benthos writer. Type implementation that writes messages to an Azure Blob Storage storage account.

func NewAzureBlobStorage added in v3.21.0

func NewAzureBlobStorage(
	conf AzureBlobStorageConfig,
	log log.Modular,
	stats metrics.Type,
) (*AzureBlobStorage, error)

NewAzureBlobStorage creates a new AzureBlobStorage writer.Type.

func (*AzureBlobStorage) CloseAsync added in v3.21.0

func (a *AzureBlobStorage) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*AzureBlobStorage) Connect added in v3.21.0

func (a *AzureBlobStorage) Connect() error

Connect attempts to establish a connection to the target Blob Storage Account.

func (*AzureBlobStorage) ConnectWithContext added in v3.21.0

func (a *AzureBlobStorage) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target Blob Storage Account.

func (*AzureBlobStorage) WaitForClose added in v3.21.0

func (a *AzureBlobStorage) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*AzureBlobStorage) Write added in v3.21.0

func (a *AzureBlobStorage) Write(msg types.Message) error

Write attempts to write message contents to a target Azure Blob Storage container as files.

func (*AzureBlobStorage) WriteWithContext added in v3.21.0

func (a *AzureBlobStorage) WriteWithContext(_ context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a target storage account as files.

type AzureBlobStorageConfig added in v3.21.0

type AzureBlobStorageConfig struct {
	StorageAccount          string `json:"storage_account" yaml:"storage_account"`
	StorageAccessKey        string `json:"storage_access_key" yaml:"storage_access_key"`
	StorageSASToken         string `json:"storage_sas_token" yaml:"storage_sas_token"`
	StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"`
	Container               string `json:"container" yaml:"container"`
	Path                    string `json:"path" yaml:"path"`
	BlobType                string `json:"blob_type" yaml:"blob_type"`
	PublicAccessLevel       string `json:"public_access_level" yaml:"public_access_level"`
	MaxInFlight             int    `json:"max_in_flight" yaml:"max_in_flight"`
}

AzureBlobStorageConfig contains configuration fields for the AzureBlobStorage output type.

func NewAzureBlobStorageConfig added in v3.21.0

func NewAzureBlobStorageConfig() AzureBlobStorageConfig

NewAzureBlobStorageConfig creates a new Config with default values.

type AzureQueueStorage added in v3.36.0

type AzureQueueStorage struct {
	// contains filtered or unexported fields
}

AzureQueueStorage is a benthos writer.Type implementation that writes messages to an Azure Queue Storage queue.

func NewAzureQueueStorage added in v3.36.0

func NewAzureQueueStorage(
	conf AzureQueueStorageConfig,
	log log.Modular,
	stats metrics.Type,
) (*AzureQueueStorage, error)

NewAzureQueueStorage creates a new Azure Queue Storage writer type.

func (*AzureQueueStorage) CloseAsync added in v3.36.0

func (a *AzureQueueStorage) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*AzureQueueStorage) Connect added in v3.36.0

func (a *AzureQueueStorage) Connect() error

Connect attempts to establish a connection to the target

func (*AzureQueueStorage) ConnectWithContext added in v3.36.0

func (a *AzureQueueStorage) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target queue.

func (*AzureQueueStorage) WaitForClose added in v3.36.0

func (a *AzureQueueStorage) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*AzureQueueStorage) Write added in v3.36.0

func (a *AzureQueueStorage) Write(msg types.Message) error

Write attempts to write message contents to a target Azure Queue Storage queue.

func (*AzureQueueStorage) WriteWithContext added in v3.36.0

func (a *AzureQueueStorage) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a target Queue Storage

type AzureQueueStorageConfig added in v3.36.0

type AzureQueueStorageConfig struct {
	StorageAccount          string             `json:"storage_account" yaml:"storage_account"`
	StorageAccessKey        string             `json:"storage_access_key" yaml:"storage_access_key"`
	StorageConnectionString string             `json:"storage_connection_string" yaml:"storage_connection_string"`
	QueueName               string             `json:"queue_name" yaml:"queue_name"`
	TTL                     string             `json:"ttl" yaml:"ttl"`
	MaxInFlight             int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching                batch.PolicyConfig `json:"batching" yaml:"batching"`
}

AzureQueueStorageConfig contains configuration fields for the output Azure Queue Storage type.

func NewAzureQueueStorageConfig added in v3.36.0

func NewAzureQueueStorageConfig() AzureQueueStorageConfig

NewAzureQueueStorageConfig creates a new Config with default values.

type AzureTableStorage added in v3.23.0

type AzureTableStorage struct {
	// contains filtered or unexported fields
}

AzureTableStorage is a benthos writer. Type implementation that writes messages to an Azure Table Storage table.

func NewAzureTableStorage added in v3.23.0

func NewAzureTableStorage(
	conf AzureTableStorageConfig,
	log log.Modular,
	stats metrics.Type,
) (*AzureTableStorage, error)

NewAzureTableStorage creates a new Azure Table Storage writer Type.

func (*AzureTableStorage) CloseAsync added in v3.23.0

func (a *AzureTableStorage) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*AzureTableStorage) Connect added in v3.23.0

func (a *AzureTableStorage) Connect() error

Connect attempts to establish a connection to the target Table Storage Account.

func (*AzureTableStorage) ConnectWithContext added in v3.23.0

func (a *AzureTableStorage) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target Table Storage Account.

func (*AzureTableStorage) WaitForClose added in v3.23.0

func (a *AzureTableStorage) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*AzureTableStorage) Write added in v3.23.0

func (a *AzureTableStorage) Write(msg types.Message) error

Write attempts to write message contents to a target Azure Table Storage container as files.

func (*AzureTableStorage) WriteWithContext added in v3.23.0

func (a *AzureTableStorage) WriteWithContext(wctx context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a target storage account as files.

type AzureTableStorageConfig added in v3.23.0

type AzureTableStorageConfig struct {
	StorageAccount          string             `json:"storage_account" yaml:"storage_account"`
	StorageAccessKey        string             `json:"storage_access_key" yaml:"storage_access_key"`
	StorageConnectionString string             `json:"storage_connection_string" yaml:"storage_connection_string"`
	TableName               string             `json:"table_name" yaml:"table_name"`
	PartitionKey            string             `json:"partition_key" yaml:"partition_key"`
	RowKey                  string             `json:"row_key" yaml:"row_key"`
	Properties              map[string]string  `json:"properties" yaml:"properties"`
	InsertType              string             `json:"insert_type" yaml:"insert_type"`
	Timeout                 string             `json:"timeout" yaml:"timeout"`
	MaxInFlight             int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching                batch.PolicyConfig `json:"batching" yaml:"batching"`
}

AzureTableStorageConfig contains configuration fields for the AzureTableStorage output type.

func NewAzureTableStorageConfig added in v3.23.0

func NewAzureTableStorageConfig() AzureTableStorageConfig

NewAzureTableStorageConfig creates a new Config with default values.

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

Cache is a benthos writer.Type implementation that writes messages to a Cache directory.

func NewCache

func NewCache(
	conf CacheConfig,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (*Cache, error)

NewCache creates a new Cache writer.Type.

func (*Cache) CloseAsync

func (c *Cache) CloseAsync()

CloseAsync begins cleaning up resources used by this writer asynchronously.

func (*Cache) Connect

func (c *Cache) Connect() error

Connect does nothing.

func (*Cache) ConnectWithContext added in v3.8.0

func (c *Cache) ConnectWithContext(ctx context.Context) error

ConnectWithContext does nothing.

func (*Cache) WaitForClose

func (c *Cache) WaitForClose(time.Duration) error

WaitForClose will block until either the writer is closed or a specified timeout occurs.

func (*Cache) Write

func (c *Cache) Write(msg types.Message) error

Write attempts to write message contents to a target Cache.

func (*Cache) WriteWithContext added in v3.8.0

func (c *Cache) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a target Cache.

type CacheConfig

type CacheConfig struct {
	Target      string `json:"target" yaml:"target"`
	Key         string `json:"key" yaml:"key"`
	TTL         string `json:"ttl" yaml:"ttl"`
	MaxInFlight int    `json:"max_in_flight" yaml:"max_in_flight"`
}

CacheConfig contains configuration fields for the Cache output type.

func NewCacheConfig

func NewCacheConfig() CacheConfig

NewCacheConfig creates a new Config with default values.

type Drop

type Drop struct {
	// contains filtered or unexported fields
}

Drop is a benthos writer.Type implementation that writes message parts to no where.

func NewDrop

func NewDrop(
	conf DropConfig,
	log log.Modular,
	stats metrics.Type,
) *Drop

NewDrop creates a new file based writer.Type.

func (*Drop) CloseAsync

func (d *Drop) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*Drop) Connect

func (d *Drop) Connect() error

Connect is a noop.

func (*Drop) WaitForClose

func (d *Drop) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*Drop) Write

func (d *Drop) Write(msg types.Message) error

Write does nothing.

type DropConfig

type DropConfig struct{}

DropConfig contains configuration fields for the drop output type.

func NewDropConfig

func NewDropConfig() DropConfig

NewDropConfig creates a new DropConfig with default values.

type DynamoDB

type DynamoDB struct {
	// contains filtered or unexported fields
}

DynamoDB is a benthos writer.Type implementation that writes messages to an Amazon SQS queue.

func NewDynamoDB

func NewDynamoDB(
	conf DynamoDBConfig,
	log log.Modular,
	stats metrics.Type,
) (*DynamoDB, error)

NewDynamoDB creates a new Amazon SQS writer.Type.

func (*DynamoDB) CloseAsync

func (d *DynamoDB) CloseAsync()

CloseAsync begins cleaning up resources used by this writer asynchronously.

func (*DynamoDB) Connect

func (d *DynamoDB) Connect() error

Connect attempts to establish a connection to the target SQS queue.

func (*DynamoDB) ConnectWithContext added in v3.8.0

func (d *DynamoDB) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target DynamoDB table.

func (*DynamoDB) WaitForClose

func (d *DynamoDB) WaitForClose(time.Duration) error

WaitForClose will block until either the writer is closed or a specified timeout occurs.

func (*DynamoDB) Write

func (d *DynamoDB) Write(msg types.Message) error

Write attempts to write message contents to a target DynamoDB table.

func (*DynamoDB) WriteWithContext added in v3.8.0

func (d *DynamoDB) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a target DynamoDB table.

type DynamoDBConfig

type DynamoDBConfig struct {
	Table          string            `json:"table" yaml:"table"`
	StringColumns  map[string]string `json:"string_columns" yaml:"string_columns"`
	JSONMapColumns map[string]string `json:"json_map_columns" yaml:"json_map_columns"`
	TTL            string            `json:"ttl" yaml:"ttl"`
	TTLKey         string            `json:"ttl_key" yaml:"ttl_key"`
	MaxInFlight    int               `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config `json:",inline" yaml:",inline"`
	Batching       batch.PolicyConfig `json:"batching" yaml:"batching"`
	// contains filtered or unexported fields
}

DynamoDBConfig contains config fields for the DynamoDB output type.

func NewDynamoDBConfig

func NewDynamoDBConfig() DynamoDBConfig

NewDynamoDBConfig creates a DynamoDBConfig populated with default values.

type Elasticsearch

type Elasticsearch struct {
	// contains filtered or unexported fields
}

Elasticsearch is a writer type that writes messages into elasticsearch.

func NewElasticsearch

func NewElasticsearch(conf ElasticsearchConfig, log log.Modular, stats metrics.Type) (*Elasticsearch, error)

NewElasticsearch creates a new Elasticsearch writer type.

func (*Elasticsearch) CloseAsync

func (e *Elasticsearch) CloseAsync()

CloseAsync shuts down the Elasticsearch writer and stops processing messages.

func (*Elasticsearch) Connect

func (e *Elasticsearch) Connect() error

Connect attempts to establish a connection to a Elasticsearch broker.

func (*Elasticsearch) ConnectWithContext added in v3.8.0

func (e *Elasticsearch) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to a Elasticsearch broker.

func (*Elasticsearch) WaitForClose

func (e *Elasticsearch) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Elasticsearch writer has closed down.

func (*Elasticsearch) Write

func (e *Elasticsearch) Write(msg types.Message) error

Write will attempt to write a message to Elasticsearch, wait for acknowledgement, and returns an error if applicable.

func (*Elasticsearch) WriteWithContext added in v3.8.0

func (e *Elasticsearch) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext will attempt to write a message to Elasticsearch, wait for acknowledgement, and returns an error if applicable.

type ElasticsearchConfig

type ElasticsearchConfig struct {
	URLs            []string             `json:"urls" yaml:"urls"`
	Sniff           bool                 `json:"sniff" yaml:"sniff"`
	Healthcheck     bool                 `json:"healthcheck" yaml:"healthcheck"`
	ID              string               `json:"id" yaml:"id"`
	Action          string               `json:"action" yaml:"action"`
	Index           string               `json:"index" yaml:"index"`
	Pipeline        string               `json:"pipeline" yaml:"pipeline"`
	Routing         string               `json:"routing" yaml:"routing"`
	Type            string               `json:"type" yaml:"type"`
	Timeout         string               `json:"timeout" yaml:"timeout"`
	TLS             btls.Config          `json:"tls" yaml:"tls"`
	Auth            auth.BasicAuthConfig `json:"basic_auth" yaml:"basic_auth"`
	AWS             OptionalAWSConfig    `json:"aws" yaml:"aws"`
	GzipCompression bool                 `json:"gzip_compression" yaml:"gzip_compression"`
	MaxInFlight     int                  `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config  `json:",inline" yaml:",inline"`
	Batching        batch.PolicyConfig `json:"batching" yaml:"batching"`
}

ElasticsearchConfig contains configuration fields for the Elasticsearch output type.

func NewElasticsearchConfig

func NewElasticsearchConfig() ElasticsearchConfig

NewElasticsearchConfig creates a new ElasticsearchConfig with default values.

type Files

type Files struct {
	// contains filtered or unexported fields
}

Files is a benthos writer.Type implementation that writes message parts each to their own file.

func NewFiles

func NewFiles(
	conf FilesConfig,
	log log.Modular,
	stats metrics.Type,
) (*Files, error)

NewFiles creates a new file based writer.Type.

func (*Files) CloseAsync

func (f *Files) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*Files) Connect

func (f *Files) Connect() error

Connect is a noop.

func (*Files) ConnectWithContext added in v3.8.0

func (f *Files) ConnectWithContext(ctx context.Context) error

ConnectWithContext is a noop.

func (*Files) WaitForClose

func (f *Files) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*Files) Write

func (f *Files) Write(msg types.Message) error

Write attempts to write message contents to a directory as files.

func (*Files) WriteWithContext added in v3.8.0

func (f *Files) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a directory as files.

type FilesConfig

type FilesConfig struct {
	Path string `json:"path" yaml:"path"`
}

FilesConfig contains configuration fields for the files output type.

func NewFilesConfig

func NewFilesConfig() FilesConfig

NewFilesConfig creates a new Config with default values.

type GCPPubSub

type GCPPubSub struct {
	// contains filtered or unexported fields
}

GCPPubSub is a benthos writer.Type implementation that writes messages to a GCP Pub/Sub topic.

func NewGCPPubSub

func NewGCPPubSub(
	conf GCPPubSubConfig,
	log log.Modular,
	stats metrics.Type,
) (*GCPPubSub, error)

NewGCPPubSub creates a new GCP Cloud Pub/Sub writer.Type.

func (*GCPPubSub) CloseAsync

func (c *GCPPubSub) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*GCPPubSub) Connect

func (c *GCPPubSub) Connect() error

Connect attempts to establish a connection to the target GCP Pub/Sub topic.

func (*GCPPubSub) ConnectWithContext added in v3.8.0

func (c *GCPPubSub) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target GCP Pub/Sub topic.

func (*GCPPubSub) WaitForClose

func (c *GCPPubSub) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*GCPPubSub) Write

func (c *GCPPubSub) Write(msg types.Message) error

Write attempts to write message contents to a target topic.

func (*GCPPubSub) WriteWithContext added in v3.8.0

func (c *GCPPubSub) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a target topic.

type GCPPubSubConfig

type GCPPubSubConfig struct {
	ProjectID      string          `json:"project" yaml:"project"`
	TopicID        string          `json:"topic" yaml:"topic"`
	MaxInFlight    int             `json:"max_in_flight" yaml:"max_in_flight"`
	PublishTimeout string          `json:"publish_timeout" yaml:"publish_timeout"`
	Metadata       output.Metadata `json:"metadata" yaml:"metadata"`
}

GCPPubSubConfig contains configuration fields for the output GCPPubSub type.

func NewGCPPubSubConfig

func NewGCPPubSubConfig() GCPPubSubConfig

NewGCPPubSubConfig creates a new Config with default values.

type HDFS

type HDFS struct {
	// contains filtered or unexported fields
}

HDFS is a benthos writer.Type implementation that writes messages to a HDFS directory.

func NewHDFS

func NewHDFS(
	conf HDFSConfig,
	log log.Modular,
	stats metrics.Type,
) (*HDFS, error)

NewHDFS creates a new HDFS writer.Type.

func (*HDFS) CloseAsync

func (h *HDFS) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*HDFS) Connect

func (h *HDFS) Connect() error

Connect attempts to establish a connection to the target HDFS host.

func (*HDFS) ConnectWithContext added in v3.8.0

func (h *HDFS) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target HDFS host.

func (*HDFS) WaitForClose

func (h *HDFS) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*HDFS) Write

func (h *HDFS) Write(msg types.Message) error

Write attempts to write message contents to a target HDFS directory as files.

func (*HDFS) WriteWithContext added in v3.8.0

func (h *HDFS) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a target HDFS directory as files.

type HDFSConfig

type HDFSConfig struct {
	Hosts       []string           `json:"hosts" yaml:"hosts"`
	User        string             `json:"user" yaml:"user"`
	Directory   string             `json:"directory" yaml:"directory"`
	Path        string             `json:"path" yaml:"path"`
	MaxInFlight int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching    batch.PolicyConfig `json:"batching" yaml:"batching"`
}

HDFSConfig contains configuration fields for the HDFS output type.

func NewHDFSConfig

func NewHDFSConfig() HDFSConfig

NewHDFSConfig creates a new Config with default values.

type HTTPClient

type HTTPClient struct {
	// contains filtered or unexported fields
}

HTTPClient is an output type that sends messages as HTTP requests to a target server endpoint.

func NewHTTPClient

func NewHTTPClient(
	conf HTTPClientConfig,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (*HTTPClient, error)

NewHTTPClient creates a new HTTPClient writer type.

func (*HTTPClient) CloseAsync

func (h *HTTPClient) CloseAsync()

CloseAsync shuts down the HTTPClient output and stops processing messages.

func (*HTTPClient) Connect

func (h *HTTPClient) Connect() error

Connect does nothing.

func (*HTTPClient) ConnectWithContext added in v3.8.0

func (h *HTTPClient) ConnectWithContext(ctx context.Context) error

ConnectWithContext does nothing.

func (*HTTPClient) WaitForClose

func (h *HTTPClient) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the HTTPClient output has closed down.

func (*HTTPClient) Write

func (h *HTTPClient) Write(msg types.Message) error

Write attempts to send a message to an HTTP server, this attempt may include retries, and if all retries fail an error is returned.

func (*HTTPClient) WriteWithContext added in v3.8.0

func (h *HTTPClient) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to send a message to an HTTP server, this attempt may include retries, and if all retries fail an error is returned.

type HTTPClientConfig

type HTTPClientConfig struct {
	client.Config     `json:",inline" yaml:",inline"`
	BatchAsMultipart  bool               `json:"batch_as_multipart" yaml:"batch_as_multipart"`
	MaxInFlight       int                `json:"max_in_flight" yaml:"max_in_flight"`
	PropagateResponse bool               `json:"propagate_response" yaml:"propagate_response"`
	Batching          batch.PolicyConfig `json:"batching" yaml:"batching"`
}

HTTPClientConfig contains configuration fields for the HTTPClient output type.

func NewHTTPClientConfig

func NewHTTPClientConfig() HTTPClientConfig

NewHTTPClientConfig creates a new HTTPClientConfig with default values.

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka is a writer type that writes messages into kafka.

func NewKafka

func NewKafka(conf KafkaConfig, mgr types.Manager, log log.Modular, stats metrics.Type) (*Kafka, error)

NewKafka creates a new Kafka writer type.

func (*Kafka) CloseAsync

func (k *Kafka) CloseAsync()

CloseAsync shuts down the Kafka writer and stops processing messages.

func (*Kafka) Connect

func (k *Kafka) Connect() error

Connect attempts to establish a connection to a Kafka broker.

func (*Kafka) ConnectWithContext added in v3.8.0

func (k *Kafka) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to a Kafka broker.

func (*Kafka) WaitForClose

func (k *Kafka) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Kafka writer has closed down.

func (*Kafka) Write

func (k *Kafka) Write(msg types.Message) error

Write will attempt to write a message to Kafka, wait for acknowledgement, and returns an error if applicable.

func (*Kafka) WriteWithContext added in v3.8.0

func (k *Kafka) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext will attempt to write a message to Kafka, wait for acknowledgement, and returns an error if applicable.

type KafkaConfig

type KafkaConfig struct {
	Addresses        []string    `json:"addresses" yaml:"addresses"`
	ClientID         string      `json:"client_id" yaml:"client_id"`
	Key              string      `json:"key" yaml:"key"`
	Partitioner      string      `json:"partitioner" yaml:"partitioner"`
	Partition        string      `json:"partition" yaml:"partition"`
	Topic            string      `json:"topic" yaml:"topic"`
	Compression      string      `json:"compression" yaml:"compression"`
	MaxMsgBytes      int         `json:"max_msg_bytes" yaml:"max_msg_bytes"`
	Timeout          string      `json:"timeout" yaml:"timeout"`
	AckReplicas      bool        `json:"ack_replicas" yaml:"ack_replicas"`
	TargetVersion    string      `json:"target_version" yaml:"target_version"`
	TLS              btls.Config `json:"tls" yaml:"tls"`
	SASL             sasl.Config `json:"sasl" yaml:"sasl"`
	MaxInFlight      int         `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config   `json:",inline" yaml:",inline"`
	RetryAsBatch     bool               `json:"retry_as_batch" yaml:"retry_as_batch"`
	Batching         batch.PolicyConfig `json:"batching" yaml:"batching"`
	StaticHeaders    map[string]string  `json:"static_headers" yaml:"static_headers"`
	Metadata         output.Metadata    `json:"metadata" yaml:"metadata"`
	InjectTracingMap string             `json:"inject_tracing_map" yaml:"inject_tracing_map"`

	// TODO: V4 remove this.
	RoundRobinPartitions bool `json:"round_robin_partitions" yaml:"round_robin_partitions"`
}

KafkaConfig contains configuration fields for the Kafka output type.

func NewKafkaConfig

func NewKafkaConfig() KafkaConfig

NewKafkaConfig creates a new KafkaConfig with default values.

type Kinesis

type Kinesis struct {
	// contains filtered or unexported fields
}

Kinesis is a benthos writer.Type implementation that writes messages to an Amazon Kinesis stream.

func NewKinesis

func NewKinesis(
	conf KinesisConfig,
	log log.Modular,
	stats metrics.Type,
) (*Kinesis, error)

NewKinesis creates a new Amazon Kinesis writer.Type.

func (*Kinesis) CloseAsync

func (a *Kinesis) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*Kinesis) Connect

func (a *Kinesis) Connect() error

Connect creates a new Kinesis client and ensures that the target Kinesis stream exists.

func (*Kinesis) ConnectWithContext added in v3.8.0

func (a *Kinesis) ConnectWithContext(ctx context.Context) error

ConnectWithContext creates a new Kinesis client and ensures that the target Kinesis stream exists.

func (*Kinesis) WaitForClose

func (a *Kinesis) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*Kinesis) Write

func (a *Kinesis) Write(msg types.Message) error

Write attempts to write message contents to a target Kinesis stream in batches of 500. If throttling is detected, failed messages are retried according to the configurable backoff settings.

func (*Kinesis) WriteWithContext added in v3.8.0

func (a *Kinesis) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a target Kinesis stream in batches of 500. If throttling is detected, failed messages are retried according to the configurable backoff settings.

type KinesisConfig

type KinesisConfig struct {
	Stream         string `json:"stream" yaml:"stream"`
	HashKey        string `json:"hash_key" yaml:"hash_key"`
	PartitionKey   string `json:"partition_key" yaml:"partition_key"`
	MaxInFlight    int    `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config `json:",inline" yaml:",inline"`
	Batching       batch.PolicyConfig `json:"batching" yaml:"batching"`
	// contains filtered or unexported fields
}

KinesisConfig contains configuration fields for the Kinesis output type.

func NewKinesisConfig

func NewKinesisConfig() KinesisConfig

NewKinesisConfig creates a new Config with default values.

type KinesisFirehose added in v3.1.0

type KinesisFirehose struct {
	// contains filtered or unexported fields
}

KinesisFirehose is a benthos writer.Type implementation that writes messages to an Amazon Kinesis Firehose destination.

func NewKinesisFirehose added in v3.1.0

func NewKinesisFirehose(
	conf KinesisFirehoseConfig,
	log log.Modular,
	stats metrics.Type,
) (*KinesisFirehose, error)

NewKinesisFirehose creates a new Amazon Kinesis Firehose writer.Type.

func (*KinesisFirehose) CloseAsync added in v3.1.0

func (a *KinesisFirehose) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*KinesisFirehose) Connect added in v3.1.0

func (a *KinesisFirehose) Connect() error

Connect creates a new Kinesis Firehose client and ensures that the target Kinesis Firehose delivery stream.

func (*KinesisFirehose) ConnectWithContext added in v3.8.0

func (a *KinesisFirehose) ConnectWithContext(ctx context.Context) error

ConnectWithContext creates a new Kinesis Firehose client and ensures that the target Kinesis Firehose delivery stream.

func (*KinesisFirehose) WaitForClose added in v3.1.0

func (a *KinesisFirehose) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*KinesisFirehose) Write added in v3.1.0

func (a *KinesisFirehose) Write(msg types.Message) error

Write attempts to write message contents to a target Kinesis Firehose delivery stream in batches of 500. If throttling is detected, failed messages are retried according to the configurable backoff settings.

func (*KinesisFirehose) WriteWithContext added in v3.8.0

func (a *KinesisFirehose) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a target Kinesis Firehose delivery stream in batches of 500. If throttling is detected, failed messages are retried according to the configurable backoff settings.

type KinesisFirehoseConfig added in v3.1.0

type KinesisFirehoseConfig struct {
	Stream         string `json:"stream" yaml:"stream"`
	MaxInFlight    int    `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config `json:",inline" yaml:",inline"`
	Batching       batch.PolicyConfig `json:"batching" yaml:"batching"`
	// contains filtered or unexported fields
}

KinesisFirehoseConfig contains configuration fields for the KinesisFirehose output type.

func NewKinesisFirehoseConfig added in v3.1.0

func NewKinesisFirehoseConfig() KinesisFirehoseConfig

NewKinesisFirehoseConfig creates a new Config with default values.

type MQTT

type MQTT struct {
	// contains filtered or unexported fields
}

MQTT is an output type that serves MQTT messages.

func NewMQTT

func NewMQTT(
	conf MQTTConfig,
	log log.Modular,
	stats metrics.Type,
) (*MQTT, error)

NewMQTT creates a new MQTT output type.

func (*MQTT) CloseAsync

func (m *MQTT) CloseAsync()

CloseAsync shuts down the MQTT output and stops processing messages.

func (*MQTT) Connect

func (m *MQTT) Connect() error

Connect establishes a connection to an MQTT server.

func (*MQTT) ConnectWithContext added in v3.8.0

func (m *MQTT) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an MQTT server.

func (*MQTT) WaitForClose

func (m *MQTT) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the MQTT output has closed down.

func (*MQTT) Write

func (m *MQTT) Write(msg types.Message) error

Write attempts to write a message by pushing it to an MQTT broker.

func (*MQTT) WriteWithContext added in v3.8.0

func (m *MQTT) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write a message by pushing it to an MQTT broker.

type MQTTConfig

type MQTTConfig struct {
	URLs        []string      `json:"urls" yaml:"urls"`
	QoS         uint8         `json:"qos" yaml:"qos"`
	Retained    bool          `json:"retained" yaml:"retained"`
	Topic       string        `json:"topic" yaml:"topic"`
	ClientID    string        `json:"client_id" yaml:"client_id"`
	Will        mqttconf.Will `json:"will" yaml:"will"`
	User        string        `json:"user" yaml:"user"`
	Password    string        `json:"password" yaml:"password"`
	KeepAlive   int64         `json:"keepalive" yaml:"keepalive"`
	MaxInFlight int           `json:"max_in_flight" yaml:"max_in_flight"`
	TLS         tls.Config    `json:"tls" yaml:"tls"`
}

MQTTConfig contains configuration fields for the MQTT output type.

func NewMQTTConfig

func NewMQTTConfig() MQTTConfig

NewMQTTConfig creates a new MQTTConfig with default values.

type NATS

type NATS struct {
	// contains filtered or unexported fields
}

NATS is an output type that serves NATS messages.

func NewNATS

func NewNATS(conf NATSConfig, log log.Modular, stats metrics.Type) (*NATS, error)

NewNATS creates a new NATS output type.

func (*NATS) CloseAsync

func (n *NATS) CloseAsync()

CloseAsync shuts down the MQTT output and stops processing messages.

func (*NATS) Connect

func (n *NATS) Connect() error

Connect attempts to establish a connection to NATS servers.

func (*NATS) ConnectWithContext added in v3.8.0

func (n *NATS) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to NATS servers.

func (*NATS) WaitForClose

func (n *NATS) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the NATS output has closed down.

func (*NATS) Write

func (n *NATS) Write(msg types.Message) error

Write attempts to write a message.

func (*NATS) WriteWithContext added in v3.8.0

func (n *NATS) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write a message.

type NATSConfig

type NATSConfig struct {
	URLs        []string    `json:"urls" yaml:"urls"`
	Subject     string      `json:"subject" yaml:"subject"`
	MaxInFlight int         `json:"max_in_flight" yaml:"max_in_flight"`
	TLS         btls.Config `json:"tls" yaml:"tls"`
	Auth        auth.Config `json:"auth" yaml:"auth"`
}

NATSConfig contains configuration fields for the NATS output type.

func NewNATSConfig

func NewNATSConfig() NATSConfig

NewNATSConfig creates a new NATSConfig with default values.

type NATSStream

type NATSStream struct {
	// contains filtered or unexported fields
}

NATSStream is an output type that serves NATS messages.

func NewNATSStream

func NewNATSStream(conf NATSStreamConfig, log log.Modular, stats metrics.Type) (*NATSStream, error)

NewNATSStream creates a new NATS Stream output type.

func (*NATSStream) CloseAsync

func (n *NATSStream) CloseAsync()

CloseAsync shuts down the MQTT output and stops processing messages.

func (*NATSStream) Connect

func (n *NATSStream) Connect() error

Connect attempts to establish a connection to NATS servers.

func (*NATSStream) ConnectWithContext added in v3.8.0

func (n *NATSStream) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to NATS servers.

func (*NATSStream) WaitForClose

func (n *NATSStream) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the NATS output has closed down.

func (*NATSStream) Write

func (n *NATSStream) Write(msg types.Message) error

Write attempts to write a message.

func (*NATSStream) WriteWithContext added in v3.8.0

func (n *NATSStream) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write a message.

type NATSStreamConfig

type NATSStreamConfig struct {
	URLs        []string    `json:"urls" yaml:"urls"`
	ClusterID   string      `json:"cluster_id" yaml:"cluster_id"`
	ClientID    string      `json:"client_id" yaml:"client_id"`
	Subject     string      `json:"subject" yaml:"subject"`
	MaxInFlight int         `json:"max_in_flight" yaml:"max_in_flight"`
	TLS         btls.Config `json:"tls" yaml:"tls"`
	Auth        auth.Config `json:"auth" yaml:"auth"`
}

NATSStreamConfig contains configuration fields for the NATSStream output type.

func NewNATSStreamConfig

func NewNATSStreamConfig() NATSStreamConfig

NewNATSStreamConfig creates a new NATSStreamConfig with default values.

type NSQ

type NSQ struct {
	// contains filtered or unexported fields
}

NSQ is an output type that serves NSQ messages.

func NewNSQ

func NewNSQ(conf NSQConfig, log log.Modular, stats metrics.Type) (*NSQ, error)

NewNSQ creates a new NSQ output type.

func (*NSQ) CloseAsync

func (n *NSQ) CloseAsync()

CloseAsync shuts down the NSQ output and stops processing messages.

func (*NSQ) Connect

func (n *NSQ) Connect() error

Connect attempts to establish a connection to NSQ servers.

func (*NSQ) ConnectWithContext added in v3.8.0

func (n *NSQ) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to NSQ servers.

func (*NSQ) WaitForClose

func (n *NSQ) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the NSQ output has closed down.

func (*NSQ) Write

func (n *NSQ) Write(msg types.Message) error

Write attempts to write a message.

func (*NSQ) WriteWithContext added in v3.8.0

func (n *NSQ) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write a message.

type NSQConfig

type NSQConfig struct {
	Address     string      `json:"nsqd_tcp_address" yaml:"nsqd_tcp_address"`
	Topic       string      `json:"topic" yaml:"topic"`
	UserAgent   string      `json:"user_agent" yaml:"user_agent"`
	TLS         btls.Config `json:"tls" yaml:"tls"`
	MaxInFlight int         `json:"max_in_flight" yaml:"max_in_flight"`
}

NSQConfig contains configuration fields for the NSQ output type.

func NewNSQConfig

func NewNSQConfig() NSQConfig

NewNSQConfig creates a new NSQConfig with default values.

type Nanomsg

type Nanomsg struct {
	// contains filtered or unexported fields
}

Nanomsg is an output type that serves Nanomsg messages.

func NewNanomsg

func NewNanomsg(conf NanomsgConfig, log log.Modular, stats metrics.Type) (*Nanomsg, error)

NewNanomsg creates a new Nanomsg output type.

func (*Nanomsg) CloseAsync

func (s *Nanomsg) CloseAsync()

CloseAsync shuts down the Nanomsg output and stops processing messages.

func (*Nanomsg) Connect

func (s *Nanomsg) Connect() error

Connect establishes a connection to a nanomsg socket.

func (*Nanomsg) ConnectWithContext added in v3.8.0

func (s *Nanomsg) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to a nanomsg socket.

func (*Nanomsg) WaitForClose

func (s *Nanomsg) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Nanomsg output has closed down.

func (*Nanomsg) Write

func (s *Nanomsg) Write(msg types.Message) error

Write attempts to write a message by pushing it to a nanomsg socket.

func (*Nanomsg) WriteWithContext added in v3.8.0

func (s *Nanomsg) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write a message by pushing it to a nanomsg socket.

type NanomsgConfig

type NanomsgConfig struct {
	URLs        []string `json:"urls" yaml:"urls"`
	Bind        bool     `json:"bind" yaml:"bind"`
	SocketType  string   `json:"socket_type" yaml:"socket_type"`
	PollTimeout string   `json:"poll_timeout" yaml:"poll_timeout"`
	MaxInFlight int      `json:"max_in_flight" yaml:"max_in_flight"`
}

NanomsgConfig contains configuration fields for the Nanomsg output type.

func NewNanomsgConfig

func NewNanomsgConfig() NanomsgConfig

NewNanomsgConfig creates a new NanomsgConfig with default values.

type OptionalAWSConfig

type OptionalAWSConfig struct {
	Enabled     bool `json:"enabled" yaml:"enabled"`
	sess.Config `json:",inline" yaml:",inline"`
}

OptionalAWSConfig contains config fields for AWS authentication with an enable flag.

type RedisHash

type RedisHash struct {
	// contains filtered or unexported fields
}

RedisHash is an output type that writes hash objects to Redis using the HMSET command.

func NewRedisHash

func NewRedisHash(
	conf RedisHashConfig,
	log log.Modular,
	stats metrics.Type,
) (*RedisHash, error)

NewRedisHash creates a new RedisHash output type.

func (*RedisHash) CloseAsync

func (r *RedisHash) CloseAsync()

CloseAsync shuts down the RedisHash output and stops processing messages.

func (*RedisHash) Connect

func (r *RedisHash) Connect() error

Connect establishes a connection to an RedisHash server.

func (*RedisHash) ConnectWithContext added in v3.8.0

func (r *RedisHash) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an RedisHash server.

func (*RedisHash) WaitForClose

func (r *RedisHash) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the RedisHash output has closed down.

func (*RedisHash) Write

func (r *RedisHash) Write(msg types.Message) error

Write attempts to write a message to Redis by setting it using the HMSET command.

func (*RedisHash) WriteWithContext added in v3.8.0

func (r *RedisHash) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write a message to Redis by setting it using the HMSET command.

type RedisHashConfig

type RedisHashConfig struct {
	bredis.Config  `json:",inline" yaml:",inline"`
	Key            string            `json:"key" yaml:"key"`
	WalkMetadata   bool              `json:"walk_metadata" yaml:"walk_metadata"`
	WalkJSONObject bool              `json:"walk_json_object" yaml:"walk_json_object"`
	Fields         map[string]string `json:"fields" yaml:"fields"`
	MaxInFlight    int               `json:"max_in_flight" yaml:"max_in_flight"`
}

RedisHashConfig contains configuration fields for the RedisHash output type.

func NewRedisHashConfig

func NewRedisHashConfig() RedisHashConfig

NewRedisHashConfig creates a new RedisHashConfig with default values.

type RedisList

type RedisList struct {
	// contains filtered or unexported fields
}

RedisList is an output type that serves RedisList messages.

func NewRedisList

func NewRedisList(
	conf RedisListConfig,
	log log.Modular,
	stats metrics.Type,
) (*RedisList, error)

NewRedisList creates a new RedisList output type.

func (*RedisList) CloseAsync

func (r *RedisList) CloseAsync()

CloseAsync shuts down the RedisList output and stops processing messages.

func (*RedisList) Connect

func (r *RedisList) Connect() error

Connect establishes a connection to an RedisList server.

func (*RedisList) ConnectWithContext added in v3.8.0

func (r *RedisList) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an RedisList server.

func (*RedisList) WaitForClose

func (r *RedisList) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the RedisList output has closed down.

func (*RedisList) Write

func (r *RedisList) Write(msg types.Message) error

Write attempts to write a message by pushing it to the end of a Redis list.

func (*RedisList) WriteWithContext added in v3.8.0

func (r *RedisList) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write a message by pushing it to the end of a Redis list.

type RedisListConfig

type RedisListConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Key           string             `json:"key" yaml:"key"`
	MaxInFlight   int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching      batch.PolicyConfig `json:"batching" yaml:"batching"`
}

RedisListConfig contains configuration fields for the RedisList output type.

func NewRedisListConfig

func NewRedisListConfig() RedisListConfig

NewRedisListConfig creates a new RedisListConfig with default values.

type RedisPubSub

type RedisPubSub struct {
	// contains filtered or unexported fields
}

RedisPubSub is an output type that serves RedisPubSub messages.

func NewRedisPubSub

func NewRedisPubSub(
	conf RedisPubSubConfig,
	log log.Modular,
	stats metrics.Type,
) (*RedisPubSub, error)

NewRedisPubSub creates a new RedisPubSub output type.

func (*RedisPubSub) CloseAsync

func (r *RedisPubSub) CloseAsync()

CloseAsync shuts down the RedisPubSub output and stops processing messages.

func (*RedisPubSub) Connect

func (r *RedisPubSub) Connect() error

Connect establishes a connection to an RedisPubSub server.

func (*RedisPubSub) ConnectWithContext added in v3.8.0

func (r *RedisPubSub) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an RedisPubSub server.

func (*RedisPubSub) WaitForClose

func (r *RedisPubSub) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the RedisPubSub output has closed down.

func (*RedisPubSub) Write

func (r *RedisPubSub) Write(msg types.Message) error

Write attempts to write a message by pushing it to a Redis pub/sub topic.

func (*RedisPubSub) WriteWithContext added in v3.8.0

func (r *RedisPubSub) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write a message by pushing it to a Redis pub/sub topic.

type RedisPubSubConfig

type RedisPubSubConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Channel       string             `json:"channel" yaml:"channel"`
	MaxInFlight   int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching      batch.PolicyConfig `json:"batching" yaml:"batching"`
}

RedisPubSubConfig contains configuration fields for the RedisPubSub output type.

func NewRedisPubSubConfig

func NewRedisPubSubConfig() RedisPubSubConfig

NewRedisPubSubConfig creates a new RedisPubSubConfig with default values.

type RedisStreams

type RedisStreams struct {
	// contains filtered or unexported fields
}

RedisStreams is an output type that serves RedisStreams messages.

func NewRedisStreams

func NewRedisStreams(
	conf RedisStreamsConfig,
	log log.Modular,
	stats metrics.Type,
) (*RedisStreams, error)

NewRedisStreams creates a new RedisStreams output type.

func (*RedisStreams) CloseAsync

func (r *RedisStreams) CloseAsync()

CloseAsync shuts down the RedisStreams output and stops processing messages.

func (*RedisStreams) Connect

func (r *RedisStreams) Connect() error

Connect establishes a connection to an RedisStreams server.

func (*RedisStreams) ConnectWithContext added in v3.8.0

func (r *RedisStreams) ConnectWithContext(ctx context.Context) error

ConnectWithContext establishes a connection to an RedisStreams server.

func (*RedisStreams) WaitForClose

func (r *RedisStreams) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the RedisStreams output has closed down.

func (*RedisStreams) Write

func (r *RedisStreams) Write(msg types.Message) error

Write attempts to write a message by pushing it to a Redis stream.

func (*RedisStreams) WriteWithContext added in v3.8.0

func (r *RedisStreams) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write a message by pushing it to a Redis stream.

type RedisStreamsConfig

type RedisStreamsConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Stream        string             `json:"stream" yaml:"stream"`
	BodyKey       string             `json:"body_key" yaml:"body_key"`
	MaxLenApprox  int64              `json:"max_length" yaml:"max_length"`
	MaxInFlight   int                `json:"max_in_flight" yaml:"max_in_flight"`
	Metadata      output.Metadata    `json:"metadata" yaml:"metadata"`
	Batching      batch.PolicyConfig `json:"batching" yaml:"batching"`
}

RedisStreamsConfig contains configuration fields for the RedisStreams output type.

func NewRedisStreamsConfig

func NewRedisStreamsConfig() RedisStreamsConfig

NewRedisStreamsConfig creates a new RedisStreamsConfig with default values.

type SNS

type SNS struct {
	// contains filtered or unexported fields
}

SNS is a benthos writer.Type implementation that writes messages to an Amazon SNS queue.

func NewSNS

func NewSNS(conf SNSConfig, log log.Modular, stats metrics.Type) (*SNS, error)

NewSNS creates a new Amazon SNS writer.Type.

func (*SNS) CloseAsync

func (a *SNS) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*SNS) Connect

func (a *SNS) Connect() error

Connect attempts to establish a connection to the target SNS queue.

func (*SNS) ConnectWithContext added in v3.8.0

func (a *SNS) ConnectWithContext(ctx context.Context) error

ConnectWithContext attempts to establish a connection to the target SNS queue.

func (*SNS) WaitForClose

func (a *SNS) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

func (*SNS) Write

func (a *SNS) Write(msg types.Message) error

Write attempts to write message contents to a target SNS.

func (*SNS) WriteWithContext added in v3.8.0

func (a *SNS) WriteWithContext(wctx context.Context, msg types.Message) error

WriteWithContext attempts to write message contents to a target SNS.

type SNSConfig

type SNSConfig struct {
	TopicArn string `json:"topic_arn" yaml:"topic_arn"`

	Timeout     string `json:"timeout" yaml:"timeout"`
	MaxInFlight int    `json:"max_in_flight" yaml:"max_in_flight"`
	// contains filtered or unexported fields
}

SNSConfig contains configuration fields for the output SNS type.

func NewSNSConfig

func NewSNSConfig() SNSConfig

NewSNSConfig creates a new Config with default values.

type Socket added in v3.9.0

type Socket struct {
	// contains filtered or unexported fields
}

Socket is an output type that sends messages as a continuous steam of line delimied messages over socket.

func NewSocket added in v3.9.0

func NewSocket(
	conf SocketConfig,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (*Socket, error)

NewSocket creates a new Socket writer type.

func (*Socket) CloseAsync added in v3.9.0

func (s *Socket) CloseAsync()

CloseAsync shuts down the socket output and stops processing messages.

func (*Socket) Connect added in v3.9.0

func (s *Socket) Connect() error

Connect establises a connection to the target socket server.

func (*Socket) ConnectWithContext added in v3.43.0

func (s *Socket) ConnectWithContext(ctx context.Context) error

ConnectWithContext establises a connection to the target socket server.

func (*Socket) WaitForClose added in v3.9.0

func (s *Socket) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the socket output has closed down.

func (*Socket) Write added in v3.9.0

func (s *Socket) Write(msg types.Message) error

Write attempts to write a message.

func (*Socket) WriteWithContext added in v3.43.0

func (s *Socket) WriteWithContext(ctx context.Context, msg types.Message) error

WriteWithContext attempts to write a message.

type SocketConfig added in v3.9.0

type SocketConfig struct {
	Network string `json:"network" yaml:"network"`
	Address string `json:"address" yaml:"address"`
	Codec   string `json:"codec" yaml:"codec"`
}

SocketConfig contains configuration fields for the Socket output type.

func NewSocketConfig added in v3.9.0

func NewSocketConfig() SocketConfig

NewSocketConfig creates a new SocketConfig with default values.

type TCP

type TCP struct {
	// contains filtered or unexported fields
}

TCP is an output type that sends messages as a continuous steam of line delimied messages over TCP.

func NewTCP

func NewTCP(
	conf TCPConfig,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (*TCP, error)

NewTCP creates a new TCP writer type.

func (*TCP) CloseAsync

func (t *TCP) CloseAsync()

CloseAsync shuts down the TCP output and stops processing messages.

func (*TCP) Connect

func (t *TCP) Connect() error

Connect does nothing.

func (*TCP) WaitForClose

func (t *TCP) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the TCP output has closed down.

func (*TCP) Write

func (t *TCP) Write(msg types.Message) error

Write attempts to write a message.

type TCPConfig

type TCPConfig struct {
	Address string `json:"address" yaml:"address"`
}

TCPConfig contains configuration fields for the TCP output type.

func NewTCPConfig

func NewTCPConfig() TCPConfig

NewTCPConfig creates a new TCPConfig with default values.

type Type

type Type interface {
	// Connect attempts to establish a connection to the sink, if unsuccessful
	// returns an error. If the attempt is successful (or not necessary) returns
	// nil.
	Connect() error

	// Write should block until either the message is sent (and acknowledged) to
	// a sink, or a transport specific error has occurred, or the Type is
	// closed.
	Write(msg types.Message) error

	types.Closable
}

Type is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.

type UDP

type UDP struct {
	// contains filtered or unexported fields
}

UDP is an output type that sends messages as a continuous steam of line delimied messages over UDP.

func NewUDP

func NewUDP(
	conf UDPConfig,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (*UDP, error)

NewUDP creates a new UDP writer type.

func (*UDP) CloseAsync

func (t *UDP) CloseAsync()

CloseAsync shuts down the UDP output and stops processing messages.

func (*UDP) Connect

func (t *UDP) Connect() error

Connect does nothing.

func (*UDP) WaitForClose

func (t *UDP) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the UDP output has closed down.

func (*UDP) Write

func (t *UDP) Write(msg types.Message) error

Write attempts to write a message.

type UDPConfig

type UDPConfig struct {
	Address string `json:"address" yaml:"address"`
}

UDPConfig contains configuration fields for the UDP output type.

func NewUDPConfig

func NewUDPConfig() UDPConfig

NewUDPConfig creates a new UDPConfig with default values.

type Websocket

type Websocket struct {
	// contains filtered or unexported fields
}

Websocket is an output type that serves Websocket messages.

func NewWebsocket

func NewWebsocket(
	conf WebsocketConfig,
	log log.Modular,
	stats metrics.Type,
) (*Websocket, error)

NewWebsocket creates a new Websocket output type.

func (*Websocket) CloseAsync

func (w *Websocket) CloseAsync()

CloseAsync shuts down the Websocket output and stops processing messages.

func (*Websocket) Connect

func (w *Websocket) Connect() error

Connect establishes a connection to an Websocket server.

func (*Websocket) WaitForClose

func (w *Websocket) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Websocket output has closed down.

func (*Websocket) Write

func (w *Websocket) Write(msg types.Message) error

Write attempts to write a message by pushing it to an Websocket broker.

type WebsocketConfig

type WebsocketConfig struct {
	URL         string `json:"url" yaml:"url"`
	auth.Config `json:",inline" yaml:",inline"`
}

WebsocketConfig contains configuration fields for the Websocket output type.

func NewWebsocketConfig

func NewWebsocketConfig() WebsocketConfig

NewWebsocketConfig creates a new WebsocketConfig with default values.

type ZMQ4Config

type ZMQ4Config struct {
	URLs          []string `json:"urls" yaml:"urls"`
	Bind          bool     `json:"bind" yaml:"bind"`
	SocketType    string   `json:"socket_type" yaml:"socket_type"`
	HighWaterMark int      `json:"high_water_mark" yaml:"high_water_mark"`
	PollTimeout   string   `json:"poll_timeout" yaml:"poll_timeout"`
}

ZMQ4Config contains configuration fields for the ZMQ4 output type.

func NewZMQ4Config

func NewZMQ4Config() *ZMQ4Config

NewZMQ4Config creates a new ZMQ4Config with default values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL