output

package
v4.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2023 License: MIT Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// GCPCloudStorageErrorIfExistsCollisionMode - error-if-exists.
	GCPCloudStorageErrorIfExistsCollisionMode = "error-if-exists"

	// GCPCloudStorageAppendCollisionMode - append.
	GCPCloudStorageAppendCollisionMode = "append"

	// GCPCloudStorageIgnoreCollisionMode - ignore.
	GCPCloudStorageIgnoreCollisionMode = "ignore"

	// GCPCloudStorageOverwriteCollisionMode - overwrite.
	GCPCloudStorageOverwriteCollisionMode = "overwrite"
)

Variables

View Source
var InjectTracingSpanMappingDocs = docs.FieldBloblang(
	"inject_tracing_map",
	"EXPERIMENTAL: A [Bloblang mapping](/docs/guides/bloblang/about) used to inject an object containing tracing propagation information into outbound messages. The specification of the injected fields will match the format used by the service wide tracer.",
	`meta = meta().merge(this)`,
	`root.meta.span = this`,
).AtVersion("3.45.0").Advanced()

InjectTracingSpanMappingDocs returns a field spec describing an inject tracing span mapping.

Functions

func Description

func Description(async, batches bool, content string) string

Description appends standard feature descriptions to an output description based on various features of the output.

func IterateBatchedSend added in v4.1.0

func IterateBatchedSend(msg message.Batch, fn func(int, *message.Part) error) error

IterateBatchedSend executes a closure fn on each message of a batch, where the closure is expected to attempt a send and return an error. If an error is returned then it is added to a batch error in order to support index specific error handling.

However, if a fatal error is returned such as a connection loss or shut down then it is returned immediately.

Types

type AMQP1Config added in v4.1.0

type AMQP1Config struct {
	URL                          string                       `json:"url" yaml:"url"`
	TargetAddress                string                       `json:"target_address" yaml:"target_address"`
	MaxInFlight                  int                          `json:"max_in_flight" yaml:"max_in_flight"`
	TLS                          btls.Config                  `json:"tls" yaml:"tls"`
	SASL                         shared.SASLConfig            `json:"sasl" yaml:"sasl"`
	Metadata                     metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	ApplicationPropertiesMapping string                       `json:"application_properties_map" yaml:"application_properties_map"`
}

AMQP1Config contains configuration fields for the AMQP1 output type.

func NewAMQP1Config added in v4.1.0

func NewAMQP1Config() AMQP1Config

NewAMQP1Config creates a new AMQP1Config with default values.

type AMQPConfig added in v4.1.0

type AMQPConfig struct {
	URLs            []string                     `json:"urls" yaml:"urls"`
	MaxInFlight     int                          `json:"max_in_flight" yaml:"max_in_flight"`
	Exchange        string                       `json:"exchange" yaml:"exchange"`
	ExchangeDeclare AMQPExchangeDeclareConfig    `json:"exchange_declare" yaml:"exchange_declare"`
	BindingKey      string                       `json:"key" yaml:"key"`
	Type            string                       `json:"type" yaml:"type"`
	ContentType     string                       `json:"content_type" yaml:"content_type"`
	ContentEncoding string                       `json:"content_encoding" yaml:"content_encoding"`
	Metadata        metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	Priority        string                       `json:"priority" yaml:"priority"`
	Persistent      bool                         `json:"persistent" yaml:"persistent"`
	Mandatory       bool                         `json:"mandatory" yaml:"mandatory"`
	Immediate       bool                         `json:"immediate" yaml:"immediate"`
	TLS             btls.Config                  `json:"tls" yaml:"tls"`
	Timeout         string                       `json:"timeout" yaml:"timeout"`
}

AMQPConfig contains configuration fields for the AMQP output type.

func NewAMQPConfig added in v4.1.0

func NewAMQPConfig() AMQPConfig

NewAMQPConfig creates a new AMQPConfig with default values.

type AMQPExchangeDeclareConfig added in v4.1.0

type AMQPExchangeDeclareConfig struct {
	Enabled bool   `json:"enabled" yaml:"enabled"`
	Type    string `json:"type" yaml:"type"`
	Durable bool   `json:"durable" yaml:"durable"`
}

AMQPExchangeDeclareConfig contains fields indicating whether the target AMQP exchange needs to be declared, as well as any fields specifying how to accomplish that.

type AmazonS3Config added in v4.1.0

type AmazonS3Config struct {
	sess.Config             `json:",inline" yaml:",inline"`
	Bucket                  string                       `json:"bucket" yaml:"bucket"`
	ForcePathStyleURLs      bool                         `json:"force_path_style_urls" yaml:"force_path_style_urls"`
	Path                    string                       `json:"path" yaml:"path"`
	Tags                    map[string]string            `json:"tags" yaml:"tags"`
	ContentType             string                       `json:"content_type" yaml:"content_type"`
	ContentEncoding         string                       `json:"content_encoding" yaml:"content_encoding"`
	CacheControl            string                       `json:"cache_control" yaml:"cache_control"`
	ContentDisposition      string                       `json:"content_disposition" yaml:"content_disposition"`
	ContentLanguage         string                       `json:"content_language" yaml:"content_language"`
	WebsiteRedirectLocation string                       `json:"website_redirect_location" yaml:"website_redirect_location"`
	Metadata                metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	StorageClass            string                       `json:"storage_class" yaml:"storage_class"`
	Timeout                 string                       `json:"timeout" yaml:"timeout"`
	KMSKeyID                string                       `json:"kms_key_id" yaml:"kms_key_id"`
	ServerSideEncryption    string                       `json:"server_side_encryption" yaml:"server_side_encryption"`
	MaxInFlight             int                          `json:"max_in_flight" yaml:"max_in_flight"`
	Batching                batchconfig.Config           `json:"batching" yaml:"batching"`
}

AmazonS3Config contains configuration fields for the AmazonS3 output type.

func NewAmazonS3Config added in v4.1.0

func NewAmazonS3Config() AmazonS3Config

NewAmazonS3Config creates a new Config with default values.

type AmazonSQSConfig added in v4.1.0

type AmazonSQSConfig struct {
	SessionConfig          `json:",inline" yaml:",inline"`
	URL                    string                       `json:"url" yaml:"url"`
	MessageGroupID         string                       `json:"message_group_id" yaml:"message_group_id"`
	MessageDeduplicationID string                       `json:"message_deduplication_id" yaml:"message_deduplication_id"`
	Metadata               metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	MaxInFlight            int                          `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config         `json:",inline" yaml:",inline"`
	Batching               batchconfig.Config `json:"batching" yaml:"batching"`
}

AmazonSQSConfig contains configuration fields for the output AmazonSQS type.

func NewAmazonSQSConfig added in v4.1.0

func NewAmazonSQSConfig() AmazonSQSConfig

NewAmazonSQSConfig creates a new Config with default values.

type AsyncSink added in v4.1.0

type AsyncSink interface {
	// Connect attempts to establish a connection to the sink, if
	// unsuccessful returns an error. If the attempt is successful (or not
	// necessary) returns nil.
	Connect(ctx context.Context) error

	// WriteBatch should block until either the message is sent (and
	// acknowledged) to a sink, or a transport specific error has occurred, or
	// the Type is closed.
	WriteBatch(ctx context.Context, msg message.Batch) error

	// Close is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	Close(ctx context.Context) error
}

AsyncSink is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.

type AsyncWriter added in v4.1.0

type AsyncWriter struct {
	// contains filtered or unexported fields
}

AsyncWriter is an output type that writes messages to a writer.Type.

func (*AsyncWriter) Connected added in v4.1.0

func (w *AsyncWriter) Connected() bool

Connected returns a boolean indicating whether this output is currently connected to its target.

func (*AsyncWriter) Consume added in v4.1.0

func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error

Consume assigns a messages channel for the output to read.

func (*AsyncWriter) SetInjectTracingMap added in v4.1.0

func (w *AsyncWriter) SetInjectTracingMap(exec *mapping.Executor)

SetInjectTracingMap sets a mapping to be used for injecting tracing events into messages.

func (*AsyncWriter) TriggerCloseNow added in v4.6.0

func (w *AsyncWriter) TriggerCloseNow()

TriggerCloseNow shuts down the output and stops processing messages.

func (*AsyncWriter) WaitForClose added in v4.1.0

func (w *AsyncWriter) WaitForClose(ctx context.Context) error

WaitForClose blocks until the File output has closed down.

type AzureBlobStorageConfig added in v4.1.0

type AzureBlobStorageConfig struct {
	StorageAccount          string `json:"storage_account" yaml:"storage_account"`
	StorageAccessKey        string `json:"storage_access_key" yaml:"storage_access_key"`
	StorageSASToken         string `json:"storage_sas_token" yaml:"storage_sas_token"`
	StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"`
	Container               string `json:"container" yaml:"container"`
	Path                    string `json:"path" yaml:"path"`
	BlobType                string `json:"blob_type" yaml:"blob_type"`
	PublicAccessLevel       string `json:"public_access_level" yaml:"public_access_level"`
	MaxInFlight             int    `json:"max_in_flight" yaml:"max_in_flight"`
}

AzureBlobStorageConfig contains configuration fields for the AzureBlobStorage output type.

func NewAzureBlobStorageConfig added in v4.1.0

func NewAzureBlobStorageConfig() AzureBlobStorageConfig

NewAzureBlobStorageConfig creates a new Config with default values.

type AzureQueueStorageConfig added in v4.1.0

type AzureQueueStorageConfig struct {
	StorageAccount          string             `json:"storage_account" yaml:"storage_account"`
	StorageAccessKey        string             `json:"storage_access_key" yaml:"storage_access_key"`
	StorageConnectionString string             `json:"storage_connection_string" yaml:"storage_connection_string"`
	QueueName               string             `json:"queue_name" yaml:"queue_name"`
	TTL                     string             `json:"ttl" yaml:"ttl"`
	MaxInFlight             int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching                batchconfig.Config `json:"batching" yaml:"batching"`
}

AzureQueueStorageConfig contains configuration fields for the output Azure Queue Storage type.

func NewAzureQueueStorageConfig added in v4.1.0

func NewAzureQueueStorageConfig() AzureQueueStorageConfig

NewAzureQueueStorageConfig creates a new Config with default values.

type AzureTableStorageConfig added in v4.1.0

type AzureTableStorageConfig struct {
	StorageAccount          string             `json:"storage_account" yaml:"storage_account"`
	StorageAccessKey        string             `json:"storage_access_key" yaml:"storage_access_key"`
	StorageConnectionString string             `json:"storage_connection_string" yaml:"storage_connection_string"`
	TableName               string             `json:"table_name" yaml:"table_name"`
	PartitionKey            string             `json:"partition_key" yaml:"partition_key"`
	RowKey                  string             `json:"row_key" yaml:"row_key"`
	Properties              map[string]string  `json:"properties" yaml:"properties"`
	InsertType              string             `json:"insert_type" yaml:"insert_type"`
	TransactionType         string             `json:"transaction_type" yaml:"transaction_type"`
	Timeout                 string             `json:"timeout" yaml:"timeout"`
	MaxInFlight             int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching                batchconfig.Config `json:"batching" yaml:"batching"`
}

AzureTableStorageConfig contains configuration fields for the AzureTableStorage output type.

func NewAzureTableStorageConfig added in v4.1.0

func NewAzureTableStorageConfig() AzureTableStorageConfig

NewAzureTableStorageConfig creates a new Config with default values.

type BrokerConfig added in v4.1.0

type BrokerConfig struct {
	Copies   int                `json:"copies" yaml:"copies"`
	Pattern  string             `json:"pattern" yaml:"pattern"`
	Outputs  []Config           `json:"outputs" yaml:"outputs"`
	Batching batchconfig.Config `json:"batching" yaml:"batching"`
}

BrokerConfig contains configuration fields for the Broker output type.

func NewBrokerConfig added in v4.1.0

func NewBrokerConfig() BrokerConfig

NewBrokerConfig creates a new BrokerConfig with default values.

type CacheConfig added in v4.1.0

type CacheConfig struct {
	Target      string `json:"target" yaml:"target"`
	Key         string `json:"key" yaml:"key"`
	TTL         string `json:"ttl" yaml:"ttl"`
	MaxInFlight int    `json:"max_in_flight" yaml:"max_in_flight"`
}

CacheConfig contains configuration fields for the Cache output type.

func NewCacheConfig added in v4.1.0

func NewCacheConfig() CacheConfig

NewCacheConfig creates a new Config with default values.

type CassandraConfig added in v4.1.0

type CassandraConfig struct {
	Addresses                []string              `json:"addresses" yaml:"addresses"`
	TLS                      btls.Config           `json:"tls" yaml:"tls"`
	PasswordAuthenticator    PasswordAuthenticator `json:"password_authenticator" yaml:"password_authenticator"`
	DisableInitialHostLookup bool                  `json:"disable_initial_host_lookup" yaml:"disable_initial_host_lookup"`
	Query                    string                `json:"query" yaml:"query"`
	ArgsMapping              string                `json:"args_mapping" yaml:"args_mapping"`
	Consistency              string                `json:"consistency" yaml:"consistency"`
	Timeout                  string                `json:"timeout" yaml:"timeout"`
	LoggedBatch              bool                  `json:"logged_batch" yaml:"logged_batch"`
	// TODO: V4 Remove this and replace with explicit values.
	retries.Config `json:",inline" yaml:",inline"`
	MaxInFlight    int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching       batchconfig.Config `json:"batching" yaml:"batching"`
}

CassandraConfig contains configuration fields for the Cassandra output type.

func NewCassandraConfig added in v4.1.0

func NewCassandraConfig() CassandraConfig

NewCassandraConfig creates a new CassandraConfig with default values.

type Config added in v4.1.0

type Config struct {
	Label              string                  `json:"label" yaml:"label"`
	Type               string                  `json:"type" yaml:"type"`
	AMQP09             AMQPConfig              `json:"amqp_0_9" yaml:"amqp_0_9"`
	AMQP1              AMQP1Config             `json:"amqp_1" yaml:"amqp_1"`
	AWSDynamoDB        DynamoDBConfig          `json:"aws_dynamodb" yaml:"aws_dynamodb"`
	AWSKinesis         KinesisConfig           `json:"aws_kinesis" yaml:"aws_kinesis"`
	AWSKinesisFirehose KinesisFirehoseConfig   `json:"aws_kinesis_firehose" yaml:"aws_kinesis_firehose"`
	AWSS3              AmazonS3Config          `json:"aws_s3" yaml:"aws_s3"`
	AWSSNS             SNSConfig               `json:"aws_sns" yaml:"aws_sns"`
	AWSSQS             AmazonSQSConfig         `json:"aws_sqs" yaml:"aws_sqs"`
	AzureBlobStorage   AzureBlobStorageConfig  `json:"azure_blob_storage" yaml:"azure_blob_storage"`
	AzureQueueStorage  AzureQueueStorageConfig `json:"azure_queue_storage" yaml:"azure_queue_storage"`
	AzureTableStorage  AzureTableStorageConfig `json:"azure_table_storage" yaml:"azure_table_storage"`
	Broker             BrokerConfig            `json:"broker" yaml:"broker"`
	Cache              CacheConfig             `json:"cache" yaml:"cache"`
	Cassandra          CassandraConfig         `json:"cassandra" yaml:"cassandra"`
	Drop               DropConfig              `json:"drop" yaml:"drop"`
	DropOn             DropOnConfig            `json:"drop_on" yaml:"drop_on"`
	Dynamic            DynamicConfig           `json:"dynamic" yaml:"dynamic"`
	Elasticsearch      ElasticsearchConfig     `json:"elasticsearch" yaml:"elasticsearch"`
	Fallback           TryConfig               `json:"fallback" yaml:"fallback"`
	File               FileConfig              `json:"file" yaml:"file"`
	GCPCloudStorage    GCPCloudStorageConfig   `json:"gcp_cloud_storage" yaml:"gcp_cloud_storage"`
	GCPPubSub          GCPPubSubConfig         `json:"gcp_pubsub" yaml:"gcp_pubsub"`
	HDFS               HDFSConfig              `json:"hdfs" yaml:"hdfs"`
	HTTPServer         HTTPServerConfig        `json:"http_server" yaml:"http_server"`
	Inproc             string                  `json:"inproc" yaml:"inproc"`
	Kafka              KafkaConfig             `json:"kafka" yaml:"kafka"`
	MongoDB            MongoDBConfig           `json:"mongodb" yaml:"mongodb"`
	MQTT               MQTTConfig              `json:"mqtt" yaml:"mqtt"`
	Nanomsg            NanomsgConfig           `json:"nanomsg" yaml:"nanomsg"`
	NATS               NATSConfig              `json:"nats" yaml:"nats"`
	NATSStream         NATSStreamConfig        `json:"nats_stream" yaml:"nats_stream"`
	NSQ                NSQConfig               `json:"nsq" yaml:"nsq"`
	Plugin             any                     `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	RedisHash          RedisHashConfig         `json:"redis_hash" yaml:"redis_hash"`
	RedisList          RedisListConfig         `json:"redis_list" yaml:"redis_list"`
	RedisPubSub        RedisPubSubConfig       `json:"redis_pubsub" yaml:"redis_pubsub"`
	RedisStreams       RedisStreamsConfig      `json:"redis_streams" yaml:"redis_streams"`
	Reject             string                  `json:"reject" yaml:"reject"`
	Resource           string                  `json:"resource" yaml:"resource"`
	Retry              RetryConfig             `json:"retry" yaml:"retry"`
	SFTP               SFTPConfig              `json:"sftp" yaml:"sftp"`
	STDOUT             STDOUTConfig            `json:"stdout" yaml:"stdout"`
	Subprocess         SubprocessConfig        `json:"subprocess" yaml:"subprocess"`
	Switch             SwitchConfig            `json:"switch" yaml:"switch"`
	SyncResponse       struct{}                `json:"sync_response" yaml:"sync_response"`
	Socket             SocketConfig            `json:"socket" yaml:"socket"`
	Processors         []processor.Config      `json:"processors" yaml:"processors"`
}

Config is the all encompassing configuration struct for all output types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func NewConfig added in v4.1.0

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func (*Config) UnmarshalYAML added in v4.1.0

func (conf *Config) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type DropConfig added in v4.1.0

type DropConfig struct{}

DropConfig contains configuration fields for the drop output type.

func NewDropConfig added in v4.1.0

func NewDropConfig() DropConfig

NewDropConfig creates a new DropConfig with default values.

type DropOnConditions added in v4.1.0

type DropOnConditions struct {
	Error        bool   `json:"error" yaml:"error"`
	BackPressure string `json:"back_pressure" yaml:"back_pressure"`
}

DropOnConditions is a config struct representing the different circumstances under which messages should be dropped.

type DropOnConfig added in v4.1.0

type DropOnConfig struct {
	DropOnConditions `json:",inline" yaml:",inline"`
	Output           *Config `json:"output" yaml:"output"`
}

DropOnConfig contains configuration values for the DropOn output type.

func NewDropOnConfig added in v4.1.0

func NewDropOnConfig() DropOnConfig

NewDropOnConfig creates a new DropOnConfig with default values.

func (DropOnConfig) MarshalJSON added in v4.1.0

func (d DropOnConfig) MarshalJSON() ([]byte, error)

MarshalJSON prints an empty object instead of nil.

func (DropOnConfig) MarshalYAML added in v4.1.0

func (d DropOnConfig) MarshalYAML() (any, error)

MarshalYAML prints an empty object instead of nil.

type DynamicConfig added in v4.1.0

type DynamicConfig struct {
	Outputs map[string]Config `json:"outputs" yaml:"outputs"`
	Prefix  string            `json:"prefix" yaml:"prefix"`
}

DynamicConfig contains configuration fields for the Dynamic output type.

func NewDynamicConfig added in v4.1.0

func NewDynamicConfig() DynamicConfig

NewDynamicConfig creates a new DynamicConfig with default values.

type DynamoDBConfig added in v4.1.0

type DynamoDBConfig struct {
	SessionConfig  `json:",inline" yaml:",inline"`
	Table          string            `json:"table" yaml:"table"`
	StringColumns  map[string]string `json:"string_columns" yaml:"string_columns"`
	JSONMapColumns map[string]string `json:"json_map_columns" yaml:"json_map_columns"`
	TTL            string            `json:"ttl" yaml:"ttl"`
	TTLKey         string            `json:"ttl_key" yaml:"ttl_key"`
	MaxInFlight    int               `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config `json:",inline" yaml:",inline"`
	Batching       batchconfig.Config `json:"batching" yaml:"batching"`
}

DynamoDBConfig contains config fields for the DynamoDB output type.

func NewDynamoDBConfig added in v4.1.0

func NewDynamoDBConfig() DynamoDBConfig

NewDynamoDBConfig creates a DynamoDBConfig populated with default values.

type ElasticsearchAuthConfig added in v4.12.0

type ElasticsearchAuthConfig struct {
	Enabled  bool   `json:"enabled" yaml:"enabled"`
	Username string `json:"username" yaml:"username"`
	Password string `json:"password" yaml:"password"`
}

ElasticsearchAuthConfig contains basic authentication fields.

type ElasticsearchConfig added in v4.1.0

type ElasticsearchConfig struct {
	URLs            []string                `json:"urls" yaml:"urls"`
	Sniff           bool                    `json:"sniff" yaml:"sniff"`
	Healthcheck     bool                    `json:"healthcheck" yaml:"healthcheck"`
	ID              string                  `json:"id" yaml:"id"`
	Action          string                  `json:"action" yaml:"action"`
	Index           string                  `json:"index" yaml:"index"`
	Pipeline        string                  `json:"pipeline" yaml:"pipeline"`
	Routing         string                  `json:"routing" yaml:"routing"`
	Type            string                  `json:"type" yaml:"type"`
	Timeout         string                  `json:"timeout" yaml:"timeout"`
	TLS             btls.Config             `json:"tls" yaml:"tls"`
	Auth            ElasticsearchAuthConfig `json:"basic_auth" yaml:"basic_auth"`
	AWS             OptionalAWSConfig       `json:"aws" yaml:"aws"`
	GzipCompression bool                    `json:"gzip_compression" yaml:"gzip_compression"`
	MaxInFlight     int                     `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config  `json:",inline" yaml:",inline"`
	Batching        batchconfig.Config `json:"batching" yaml:"batching"`
}

ElasticsearchConfig contains configuration fields for the Elasticsearch output type.

func NewElasticsearchConfig added in v4.1.0

func NewElasticsearchConfig() ElasticsearchConfig

NewElasticsearchConfig creates a new ElasticsearchConfig with default values.

type FileConfig added in v4.1.0

type FileConfig struct {
	Path  string `json:"path" yaml:"path"`
	Codec string `json:"codec" yaml:"codec"`
}

FileConfig contains configuration fields for the file based output type.

func NewFileConfig added in v4.1.0

func NewFileConfig() FileConfig

NewFileConfig creates a new FileConfig with default values.

type GCPCloudStorageConfig added in v4.1.0

type GCPCloudStorageConfig struct {
	Bucket          string             `json:"bucket" yaml:"bucket"`
	Path            string             `json:"path" yaml:"path"`
	ContentType     string             `json:"content_type" yaml:"content_type"`
	ContentEncoding string             `json:"content_encoding" yaml:"content_encoding"`
	ChunkSize       int                `json:"chunk_size" yaml:"chunk_size"`
	MaxInFlight     int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching        batchconfig.Config `json:"batching" yaml:"batching"`
	CollisionMode   string             `json:"collision_mode" yaml:"collision_mode"`
}

GCPCloudStorageConfig contains configuration fields for the GCP Cloud Storage output type.

func NewGCPCloudStorageConfig added in v4.1.0

func NewGCPCloudStorageConfig() GCPCloudStorageConfig

NewGCPCloudStorageConfig creates a new Config with default values.

type GCPPubSubConfig added in v4.1.0

type GCPPubSubConfig struct {
	ProjectID      string                       `json:"project" yaml:"project"`
	TopicID        string                       `json:"topic" yaml:"topic"`
	MaxInFlight    int                          `json:"max_in_flight" yaml:"max_in_flight"`
	PublishTimeout string                       `json:"publish_timeout" yaml:"publish_timeout"`
	Metadata       metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	OrderingKey    string                       `json:"ordering_key" yaml:"ordering_key"`
	Endpoint       string                       `json:"endpoint" yaml:"endpoint"`
	FlowControl    GCPPubSubFlowControlConfig   `json:"flow_control" yaml:"flow_control"`
}

GCPPubSubConfig contains configuration fields for the output GCPPubSub type.

func NewGCPPubSubConfig added in v4.1.0

func NewGCPPubSubConfig() GCPPubSubConfig

NewGCPPubSubConfig creates a new Config with default values.

type GCPPubSubFlowControlConfig added in v4.12.0

type GCPPubSubFlowControlConfig struct {
	MaxOutstandingMessages int    `json:"max_outstanding_messages" yaml:"max_outstanding_messages"`
	MaxOutstandingBytes    int    `json:"max_outstanding_bytes" yaml:"max_outstanding_bytes"`
	LimitExceededBehavior  string `json:"limit_exceeded_behavior" yaml:"limit_exceeded_behavior"`
}

GCPPubSubFlowControlConfig configures a flow control policy of the PubSub client. This affects the internal buffering mechanism that the client manages for each topic when publishing messages.

func NewGCPPubSubFlowControlConfig added in v4.12.0

func NewGCPPubSubFlowControlConfig() GCPPubSubFlowControlConfig

NewGCPPubSubFlowControlConfig creates a new flow control policy with default values.

type HDFSConfig added in v4.1.0

type HDFSConfig struct {
	Hosts       []string           `json:"hosts" yaml:"hosts"`
	User        string             `json:"user" yaml:"user"`
	Directory   string             `json:"directory" yaml:"directory"`
	Path        string             `json:"path" yaml:"path"`
	MaxInFlight int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching    batchconfig.Config `json:"batching" yaml:"batching"`
}

HDFSConfig contains configuration fields for the HDFS output type.

func NewHDFSConfig added in v4.1.0

func NewHDFSConfig() HDFSConfig

NewHDFSConfig creates a new Config with default values.

type HTTPServerConfig added in v4.1.0

type HTTPServerConfig struct {
	Address      string                `json:"address" yaml:"address"`
	Path         string                `json:"path" yaml:"path"`
	StreamPath   string                `json:"stream_path" yaml:"stream_path"`
	WSPath       string                `json:"ws_path" yaml:"ws_path"`
	AllowedVerbs []string              `json:"allowed_verbs" yaml:"allowed_verbs"`
	Timeout      string                `json:"timeout" yaml:"timeout"`
	CertFile     string                `json:"cert_file" yaml:"cert_file"`
	KeyFile      string                `json:"key_file" yaml:"key_file"`
	CORS         httpserver.CORSConfig `json:"cors" yaml:"cors"`
}

HTTPServerConfig contains configuration fields for the HTTPServer output type.

func NewHTTPServerConfig added in v4.1.0

func NewHTTPServerConfig() HTTPServerConfig

NewHTTPServerConfig creates a new HTTPServerConfig with default values.

type KafkaConfig added in v4.1.0

type KafkaConfig struct {
	Addresses        []string    `json:"addresses" yaml:"addresses"`
	ClientID         string      `json:"client_id" yaml:"client_id"`
	RackID           string      `json:"rack_id" yaml:"rack_id"`
	Key              string      `json:"key" yaml:"key"`
	Partitioner      string      `json:"partitioner" yaml:"partitioner"`
	Partition        string      `json:"partition" yaml:"partition"`
	Topic            string      `json:"topic" yaml:"topic"`
	Compression      string      `json:"compression" yaml:"compression"`
	MaxMsgBytes      int         `json:"max_msg_bytes" yaml:"max_msg_bytes"`
	Timeout          string      `json:"timeout" yaml:"timeout"`
	AckReplicas      bool        `json:"ack_replicas" yaml:"ack_replicas"`
	TargetVersion    string      `json:"target_version" yaml:"target_version"`
	TLS              btls.Config `json:"tls" yaml:"tls"`
	SASL             sasl.Config `json:"sasl" yaml:"sasl"`
	MaxInFlight      int         `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config   `json:",inline" yaml:",inline"`
	RetryAsBatch     bool                         `json:"retry_as_batch" yaml:"retry_as_batch"`
	Batching         batchconfig.Config           `json:"batching" yaml:"batching"`
	StaticHeaders    map[string]string            `json:"static_headers" yaml:"static_headers"`
	Metadata         metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	InjectTracingMap string                       `json:"inject_tracing_map" yaml:"inject_tracing_map"`
}

KafkaConfig contains configuration fields for the Kafka output type.

func NewKafkaConfig added in v4.1.0

func NewKafkaConfig() KafkaConfig

NewKafkaConfig creates a new KafkaConfig with default values.

type KinesisConfig added in v4.1.0

type KinesisConfig struct {
	SessionConfig  `json:",inline" yaml:",inline"`
	Stream         string `json:"stream" yaml:"stream"`
	HashKey        string `json:"hash_key" yaml:"hash_key"`
	PartitionKey   string `json:"partition_key" yaml:"partition_key"`
	MaxInFlight    int    `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config `json:",inline" yaml:",inline"`
	Batching       batchconfig.Config `json:"batching" yaml:"batching"`
}

KinesisConfig contains configuration fields for the Kinesis output type.

func NewKinesisConfig added in v4.1.0

func NewKinesisConfig() KinesisConfig

NewKinesisConfig creates a new Config with default values.

type KinesisFirehoseConfig added in v4.1.0

type KinesisFirehoseConfig struct {
	SessionConfig  `json:",inline" yaml:",inline"`
	Stream         string `json:"stream" yaml:"stream"`
	MaxInFlight    int    `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config `json:",inline" yaml:",inline"`
	Batching       batchconfig.Config `json:"batching" yaml:"batching"`
}

KinesisFirehoseConfig contains configuration fields for the KinesisFirehose output type.

func NewKinesisFirehoseConfig added in v4.1.0

func NewKinesisFirehoseConfig() KinesisFirehoseConfig

NewKinesisFirehoseConfig creates a new Config with default values.

type MQTTConfig added in v4.1.0

type MQTTConfig struct {
	URLs                  []string      `json:"urls" yaml:"urls"`
	QoS                   uint8         `json:"qos" yaml:"qos"`
	Retained              bool          `json:"retained" yaml:"retained"`
	RetainedInterpolated  string        `json:"retained_interpolated" yaml:"retained_interpolated"`
	Topic                 string        `json:"topic" yaml:"topic"`
	ClientID              string        `json:"client_id" yaml:"client_id"`
	DynamicClientIDSuffix string        `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"`
	Will                  mqttconf.Will `json:"will" yaml:"will"`
	User                  string        `json:"user" yaml:"user"`
	Password              string        `json:"password" yaml:"password"`
	ConnectTimeout        string        `json:"connect_timeout" yaml:"connect_timeout"`
	WriteTimeout          string        `json:"write_timeout" yaml:"write_timeout"`
	KeepAlive             int64         `json:"keepalive" yaml:"keepalive"`
	MaxInFlight           int           `json:"max_in_flight" yaml:"max_in_flight"`
	TLS                   tls.Config    `json:"tls" yaml:"tls"`
}

MQTTConfig contains configuration fields for the MQTT output type.

func NewMQTTConfig added in v4.1.0

func NewMQTTConfig() MQTTConfig

NewMQTTConfig creates a new MQTTConfig with default values.

type MongoDBConfig added in v4.1.0

type MongoDBConfig struct {
	MongoConfig client.Config `json:",inline" yaml:",inline"`

	Operation    string              `json:"operation" yaml:"operation"`
	WriteConcern client.WriteConcern `json:"write_concern" yaml:"write_concern"`

	FilterMap   string `json:"filter_map" yaml:"filter_map"`
	DocumentMap string `json:"document_map" yaml:"document_map"`
	HintMap     string `json:"hint_map" yaml:"hint_map"`

	// DeleteEmptyValue bool `json:"delete_empty_value" yaml:"delete_empty_value"`
	Upsert      bool               `json:"upsert" yaml:"upsert"`
	MaxInFlight int                `json:"max_in_flight" yaml:"max_in_flight"`
	RetryConfig retries.Config     `json:",inline" yaml:",inline"`
	Batching    batchconfig.Config `json:"batching" yaml:"batching"`
}

MongoDBConfig contains config fields for the MongoDB output type.

func NewMongoDBConfig added in v4.1.0

func NewMongoDBConfig() MongoDBConfig

NewMongoDBConfig creates a MongoDB populated with default values.

type NATSConfig added in v4.1.0

type NATSConfig struct {
	URLs        []string          `json:"urls" yaml:"urls"`
	Subject     string            `json:"subject" yaml:"subject"`
	Headers     map[string]string `json:"headers" yaml:"headers"`
	MaxInFlight int               `json:"max_in_flight" yaml:"max_in_flight"`
	TLS         btls.Config       `json:"tls" yaml:"tls"`
	Auth        auth.Config       `json:"auth" yaml:"auth"`
}

NATSConfig contains configuration fields for the NATS output type.

func NewNATSConfig added in v4.1.0

func NewNATSConfig() NATSConfig

NewNATSConfig creates a new NATSConfig with default values.

type NATSStreamConfig added in v4.1.0

type NATSStreamConfig struct {
	URLs        []string    `json:"urls" yaml:"urls"`
	ClusterID   string      `json:"cluster_id" yaml:"cluster_id"`
	ClientID    string      `json:"client_id" yaml:"client_id"`
	Subject     string      `json:"subject" yaml:"subject"`
	MaxInFlight int         `json:"max_in_flight" yaml:"max_in_flight"`
	TLS         btls.Config `json:"tls" yaml:"tls"`
	Auth        auth.Config `json:"auth" yaml:"auth"`
}

NATSStreamConfig contains configuration fields for the NATSStream output type.

func NewNATSStreamConfig added in v4.1.0

func NewNATSStreamConfig() NATSStreamConfig

NewNATSStreamConfig creates a new NATSStreamConfig with default values.

type NSQConfig added in v4.1.0

type NSQConfig struct {
	Address     string      `json:"nsqd_tcp_address" yaml:"nsqd_tcp_address"`
	Topic       string      `json:"topic" yaml:"topic"`
	UserAgent   string      `json:"user_agent" yaml:"user_agent"`
	TLS         btls.Config `json:"tls" yaml:"tls"`
	MaxInFlight int         `json:"max_in_flight" yaml:"max_in_flight"`
}

NSQConfig contains configuration fields for the NSQ output type.

func NewNSQConfig added in v4.1.0

func NewNSQConfig() NSQConfig

NewNSQConfig creates a new NSQConfig with default values.

type NanomsgConfig added in v4.1.0

type NanomsgConfig struct {
	URLs        []string `json:"urls" yaml:"urls"`
	Bind        bool     `json:"bind" yaml:"bind"`
	SocketType  string   `json:"socket_type" yaml:"socket_type"`
	PollTimeout string   `json:"poll_timeout" yaml:"poll_timeout"`
	MaxInFlight int      `json:"max_in_flight" yaml:"max_in_flight"`
}

NanomsgConfig contains configuration fields for the Nanomsg output type.

func NewNanomsgConfig added in v4.1.0

func NewNanomsgConfig() NanomsgConfig

NewNanomsgConfig creates a new NanomsgConfig with default values.

type OptionalAWSConfig added in v4.1.0

type OptionalAWSConfig struct {
	Enabled     bool `json:"enabled" yaml:"enabled"`
	sess.Config `json:",inline" yaml:",inline"`
}

OptionalAWSConfig contains config fields for AWS authentication with an enable flag.

type PasswordAuthenticator added in v4.1.0

type PasswordAuthenticator struct {
	Enabled  bool   `json:"enabled" yaml:"enabled"`
	Username string `json:"username" yaml:"username"`
	Password string `json:"password" yaml:"password"`
}

PasswordAuthenticator contains the fields that will be used to authenticate with the Cassandra cluster.

type RedisHashConfig added in v4.1.0

type RedisHashConfig struct {
	bredis.Config  `json:",inline" yaml:",inline"`
	Key            string            `json:"key" yaml:"key"`
	WalkMetadata   bool              `json:"walk_metadata" yaml:"walk_metadata"`
	WalkJSONObject bool              `json:"walk_json_object" yaml:"walk_json_object"`
	Fields         map[string]string `json:"fields" yaml:"fields"`
	MaxInFlight    int               `json:"max_in_flight" yaml:"max_in_flight"`
}

RedisHashConfig contains configuration fields for the RedisHash output type.

func NewRedisHashConfig added in v4.1.0

func NewRedisHashConfig() RedisHashConfig

NewRedisHashConfig creates a new RedisHashConfig with default values.

type RedisListConfig added in v4.1.0

type RedisListConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Key           string             `json:"key" yaml:"key"`
	MaxInFlight   int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching      batchconfig.Config `json:"batching" yaml:"batching"`
}

RedisListConfig contains configuration fields for the RedisList output type.

func NewRedisListConfig added in v4.1.0

func NewRedisListConfig() RedisListConfig

NewRedisListConfig creates a new RedisListConfig with default values.

type RedisPubSubConfig added in v4.1.0

type RedisPubSubConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Channel       string             `json:"channel" yaml:"channel"`
	MaxInFlight   int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching      batchconfig.Config `json:"batching" yaml:"batching"`
}

RedisPubSubConfig contains configuration fields for the RedisPubSub output type.

func NewRedisPubSubConfig added in v4.1.0

func NewRedisPubSubConfig() RedisPubSubConfig

NewRedisPubSubConfig creates a new RedisPubSubConfig with default values.

type RedisStreamsConfig added in v4.1.0

type RedisStreamsConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Stream        string                       `json:"stream" yaml:"stream"`
	BodyKey       string                       `json:"body_key" yaml:"body_key"`
	MaxLenApprox  int64                        `json:"max_length" yaml:"max_length"`
	MaxInFlight   int                          `json:"max_in_flight" yaml:"max_in_flight"`
	Metadata      metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	Batching      batchconfig.Config           `json:"batching" yaml:"batching"`
}

RedisStreamsConfig contains configuration fields for the RedisStreams output type.

func NewRedisStreamsConfig added in v4.1.0

func NewRedisStreamsConfig() RedisStreamsConfig

NewRedisStreamsConfig creates a new RedisStreamsConfig with default values.

type RetryConfig added in v4.1.0

type RetryConfig struct {
	Output         *Config `json:"output" yaml:"output"`
	retries.Config `json:",inline" yaml:",inline"`
}

RetryConfig contains configuration values for the Retry output type.

func NewRetryConfig added in v4.1.0

func NewRetryConfig() RetryConfig

NewRetryConfig creates a new RetryConfig with default values.

func (RetryConfig) MarshalJSON added in v4.1.0

func (r RetryConfig) MarshalJSON() ([]byte, error)

MarshalJSON prints an empty object instead of nil.

func (RetryConfig) MarshalYAML added in v4.1.0

func (r RetryConfig) MarshalYAML() (any, error)

MarshalYAML prints an empty object instead of nil.

type SFTPConfig added in v4.1.0

type SFTPConfig struct {
	Address     string                `json:"address" yaml:"address"`
	Path        string                `json:"path" yaml:"path"`
	Codec       string                `json:"codec" yaml:"codec"`
	Credentials sftpSetup.Credentials `json:"credentials" yaml:"credentials"`
	MaxInFlight int                   `json:"max_in_flight" yaml:"max_in_flight"`
}

SFTPConfig contains configuration fields for the SFTP output type.

func NewSFTPConfig added in v4.1.0

func NewSFTPConfig() SFTPConfig

NewSFTPConfig creates a new Config with default values.

type SNSConfig added in v4.1.0

type SNSConfig struct {
	TopicArn               string                       `json:"topic_arn" yaml:"topic_arn"`
	MessageGroupID         string                       `json:"message_group_id" yaml:"message_group_id"`
	MessageDeduplicationID string                       `json:"message_deduplication_id" yaml:"message_deduplication_id"`
	Metadata               metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	SessionConfig          `json:",inline" yaml:",inline"`
	Timeout                string `json:"timeout" yaml:"timeout"`
	MaxInFlight            int    `json:"max_in_flight" yaml:"max_in_flight"`
}

SNSConfig contains configuration fields for the output SNS type.

func NewSNSConfig added in v4.1.0

func NewSNSConfig() SNSConfig

NewSNSConfig creates a new Config with default values.

type STDOUTConfig added in v4.1.0

type STDOUTConfig struct {
	Codec string `json:"codec" yaml:"codec"`
}

STDOUTConfig contains configuration fields for the stdout based output type.

func NewSTDOUTConfig added in v4.1.0

func NewSTDOUTConfig() STDOUTConfig

NewSTDOUTConfig creates a new STDOUTConfig with default values.

type SessionConfig added in v4.1.0

type SessionConfig struct {
	sess.Config `json:",inline" yaml:",inline"`
}

SessionConfig hides a general AWS session config struct.

type SocketConfig added in v4.1.0

type SocketConfig struct {
	Network string `json:"network" yaml:"network"`
	Address string `json:"address" yaml:"address"`
	Codec   string `json:"codec" yaml:"codec"`
}

SocketConfig contains configuration fields for the Socket output type.

func NewSocketConfig added in v4.1.0

func NewSocketConfig() SocketConfig

NewSocketConfig creates a new SocketConfig with default values.

type Streamed

type Streamed interface {
	// Consume starts the type receiving transactions from a Transactor.
	Consume(<-chan message.Transaction) error

	// Connected returns a boolean indicating whether this output is currently
	// connected to its target.
	Connected() bool

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Streamed is a common interface implemented by outputs and provides channel based streaming APIs.

func NewAsyncWriter added in v4.1.0

func NewAsyncWriter(typeStr string, maxInflight int, w AsyncSink, mgr component.Observability) (Streamed, error)

NewAsyncWriter creates a Streamed implementation around an AsyncSink.

func OnlySinglePayloads added in v4.1.0

func OnlySinglePayloads(out Streamed) Streamed

OnlySinglePayloads expands message batches into individual payloads, respecting the max in flight of the wrapped output. This is a more efficient way of feeding messages into an output that handles its own batching mechanism internally, or does not support batching at all.

func WrapWithPipelines added in v4.1.0

func WrapWithPipelines(out Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)

WrapWithPipelines wraps an output with a variadic number of pipelines.

type SubprocessConfig added in v4.1.0

type SubprocessConfig struct {
	Name  string   `json:"name" yaml:"name"`
	Args  []string `json:"args" yaml:"args"`
	Codec string   `json:"codec" yaml:"codec"`
}

SubprocessConfig contains configuration for the Subprocess input type.

func NewSubprocessConfig added in v4.1.0

func NewSubprocessConfig() SubprocessConfig

NewSubprocessConfig creates a new SubprocessConfig with default values.

type SwitchConfig added in v4.1.0

type SwitchConfig struct {
	RetryUntilSuccess bool               `json:"retry_until_success" yaml:"retry_until_success"`
	StrictMode        bool               `json:"strict_mode" yaml:"strict_mode"`
	Cases             []SwitchConfigCase `json:"cases" yaml:"cases"`
}

SwitchConfig contains configuration fields for the switchOutput output type.

func NewSwitchConfig added in v4.1.0

func NewSwitchConfig() SwitchConfig

NewSwitchConfig creates a new SwitchConfig with default values.

type SwitchConfigCase added in v4.1.0

type SwitchConfigCase struct {
	Check    string `json:"check" yaml:"check"`
	Continue bool   `json:"continue" yaml:"continue"`
	Output   Config `json:"output" yaml:"output"`
}

SwitchConfigCase contains configuration fields per output of a switch type.

func NewSwitchConfigCase added in v4.1.0

func NewSwitchConfigCase() SwitchConfigCase

NewSwitchConfigCase creates a new switch output config with default values.

type Sync

type Sync interface {
	// WriteTransaction attempts to write a transaction to an output.
	WriteTransaction(context.Context, message.Transaction) error

	// Connected returns a boolean indicating whether this output is currently
	// connected to its target.
	Connected() bool

	// TriggerStopConsuming instructs the output to start shutting down
	// resources once all pending messages are delivered and acknowledged.
	TriggerStopConsuming()

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Sync is a common interface implemented by outputs and provides synchronous based writing APIs.

type TryConfig added in v4.1.0

type TryConfig []Config

TryConfig contains configuration fields for the Try output type.

func NewTryConfig added in v4.1.0

func NewTryConfig() TryConfig

NewTryConfig creates a new BrokerConfig with default values.

type WithPipeline added in v4.1.0

type WithPipeline struct {
	// contains filtered or unexported fields
}

WithPipeline is a type that wraps both an output type and a pipeline type by routing the pipeline through the output, and implements the output.Type interface in order to act like an ordinary output.

func WrapWithPipeline added in v4.1.0

func WrapWithPipeline(out Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)

WrapWithPipeline routes a processing pipeline directly into an output and returns a type that manages both and acts like an ordinary output.

func (*WithPipeline) Connected added in v4.1.0

func (i *WithPipeline) Connected() bool

Connected returns a boolean indicating whether this output is currently connected to its target.

func (*WithPipeline) Consume added in v4.1.0

func (i *WithPipeline) Consume(tsChan <-chan message.Transaction) error

Consume starts the type listening to a message channel from a producer.

func (*WithPipeline) TriggerCloseNow added in v4.6.0

func (i *WithPipeline) TriggerCloseNow()

TriggerCloseNow triggers a closure of this object but does not block.

func (*WithPipeline) WaitForClose added in v4.1.0

func (i *WithPipeline) WaitForClose(ctx context.Context) error

WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL