Documentation ¶
Index ¶
- type AMQP09BindingConfig
- type AMQP09Config
- type AMQP09QueueDeclareConfig
- type AMQP1Config
- type AWSKinesisConfig
- type AWSS3Config
- type AWSS3SQSConfig
- type AWSSQSConfig
- type Async
- type AsyncAckFn
- type AsyncCutOff
- type AsyncPreserver
- type AsyncReader
- type AzureBlobStorageConfig
- type AzureQueueStorageConfig
- type BrokerConfig
- type CSVFileConfig
- type Config
- type DynamicConfig
- type DynamoDBCheckpointConfig
- type FileConfig
- type GCPCloudStorageConfig
- type GCPPubSubConfig
- type GenerateConfig
- type HDFSConfig
- type HTTPClientConfig
- type HTTPServerConfig
- type HTTPServerResponseConfig
- type InprocConfig
- type KafkaBalancedGroupConfig
- type KafkaConfig
- type MQTTConfig
- type NATSConfig
- type NATSStreamConfig
- type NSQConfig
- type NanomsgConfig
- type ReadUntilConfig
- type RedisListConfig
- type RedisPubSubConfig
- type RedisStreamsConfig
- type SFTPConfig
- type STDINConfig
- type SequenceConfig
- type SequenceShardedJoinConfig
- type SocketConfig
- type SocketServerConfig
- type StreamConfig
- type Streamed
- type SubprocessConfig
- type WebsocketConfig
- type WithPipeline
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AMQP09BindingConfig ¶ added in v4.1.0
type AMQP09BindingConfig struct { Exchange string `json:"exchange" yaml:"exchange"` RoutingKey string `json:"key" yaml:"key"` }
AMQP09BindingConfig contains fields describing a queue binding to be declared.
type AMQP09Config ¶ added in v4.1.0
type AMQP09Config struct { URLs []string `json:"urls" yaml:"urls"` Queue string `json:"queue" yaml:"queue"` QueueDeclare AMQP09QueueDeclareConfig `json:"queue_declare" yaml:"queue_declare"` BindingsDeclare []AMQP09BindingConfig `json:"bindings_declare" yaml:"bindings_declare"` ConsumerTag string `json:"consumer_tag" yaml:"consumer_tag"` AutoAck bool `json:"auto_ack" yaml:"auto_ack"` NackRejectPatterns []string `json:"nack_reject_patterns" yaml:"nack_reject_patterns"` PrefetchCount int `json:"prefetch_count" yaml:"prefetch_count"` PrefetchSize int `json:"prefetch_size" yaml:"prefetch_size"` TLS btls.Config `json:"tls" yaml:"tls"` }
AMQP09Config contains configuration for the AMQP09 input type.
func NewAMQP09Config ¶ added in v4.1.0
func NewAMQP09Config() AMQP09Config
NewAMQP09Config creates a new AMQP09Config with default values.
type AMQP09QueueDeclareConfig ¶ added in v4.1.0
type AMQP09QueueDeclareConfig struct { Enabled bool `json:"enabled" yaml:"enabled"` Durable bool `json:"durable" yaml:"durable"` }
AMQP09QueueDeclareConfig contains fields indicating whether the target AMQP09 queue needs to be declared and bound to an exchange, as well as any fields specifying how to accomplish that.
type AMQP1Config ¶ added in v4.1.0
type AMQP1Config struct { URL string `json:"url" yaml:"url"` SourceAddress string `json:"source_address" yaml:"source_address"` AzureRenewLock bool `json:"azure_renew_lock" yaml:"azure_renew_lock"` TLS btls.Config `json:"tls" yaml:"tls"` SASL shared.SASLConfig `json:"sasl" yaml:"sasl"` }
AMQP1Config contains configuration for the AMQP1 input type.
func NewAMQP1Config ¶ added in v4.1.0
func NewAMQP1Config() AMQP1Config
NewAMQP1Config creates a new AMQP1Config with default values.
type AWSKinesisConfig ¶ added in v4.1.0
type AWSKinesisConfig struct { session.Config `json:",inline" yaml:",inline"` Streams []string `json:"streams" yaml:"streams"` DynamoDB DynamoDBCheckpointConfig `json:"dynamodb" yaml:"dynamodb"` CheckpointLimit int `json:"checkpoint_limit" yaml:"checkpoint_limit"` CommitPeriod string `json:"commit_period" yaml:"commit_period"` LeasePeriod string `json:"lease_period" yaml:"lease_period"` RebalancePeriod string `json:"rebalance_period" yaml:"rebalance_period"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
AWSKinesisConfig is configuration values for the input type.
func NewAWSKinesisConfig ¶ added in v4.1.0
func NewAWSKinesisConfig() AWSKinesisConfig
NewAWSKinesisConfig creates a new Config with default values.
type AWSS3Config ¶ added in v4.1.0
type AWSS3Config struct { sess.Config `json:",inline" yaml:",inline"` Bucket string `json:"bucket" yaml:"bucket"` Codec string `json:"codec" yaml:"codec"` Prefix string `json:"prefix" yaml:"prefix"` ForcePathStyleURLs bool `json:"force_path_style_urls" yaml:"force_path_style_urls"` DeleteObjects bool `json:"delete_objects" yaml:"delete_objects"` SQS AWSS3SQSConfig `json:"sqs" yaml:"sqs"` }
AWSS3Config contains configuration values for the aws_s3 input type.
func NewAWSS3Config ¶ added in v4.1.0
func NewAWSS3Config() AWSS3Config
NewAWSS3Config creates a new AWSS3Config with default values.
type AWSS3SQSConfig ¶ added in v4.1.0
type AWSS3SQSConfig struct { URL string `json:"url" yaml:"url"` Endpoint string `json:"endpoint" yaml:"endpoint"` EnvelopePath string `json:"envelope_path" yaml:"envelope_path"` KeyPath string `json:"key_path" yaml:"key_path"` BucketPath string `json:"bucket_path" yaml:"bucket_path"` DelayPeriod string `json:"delay_period" yaml:"delay_period"` MaxMessages int64 `json:"max_messages" yaml:"max_messages"` }
AWSS3SQSConfig contains configuration for hooking up the S3 input with an SQS queue.
func NewAWSS3SQSConfig ¶ added in v4.1.0
func NewAWSS3SQSConfig() AWSS3SQSConfig
NewAWSS3SQSConfig creates a new AWSS3SQSConfig with default values.
type AWSSQSConfig ¶ added in v4.1.0
type AWSSQSConfig struct { sess.Config `json:",inline" yaml:",inline"` URL string `json:"url" yaml:"url"` DeleteMessage bool `json:"delete_message" yaml:"delete_message"` ResetVisibility bool `json:"reset_visibility" yaml:"reset_visibility"` MaxNumberOfMessages int `json:"max_number_of_messages" yaml:"max_number_of_messages"` }
AWSSQSConfig contains configuration values for the input type.
func NewAWSSQSConfig ¶ added in v4.1.0
func NewAWSSQSConfig() AWSSQSConfig
NewAWSSQSConfig creates a new Config with default values.
type Async ¶ added in v4.1.0
type Async interface { // ConnectWithContext attempts to establish a connection to the source, if // unsuccessful returns an error. If the attempt is successful (or not // necessary) returns nil. ConnectWithContext(ctx context.Context) error // ReadWithContext attempts to read a new message from the source. If // successful a message is returned along with a function used to // acknowledge receipt of the returned message. It's safe to process the // returned message and read the next message asynchronously. ReadWithContext(ctx context.Context) (*message.Batch, AsyncAckFn, error) // CloseAsync triggers the shut down of this component but should not block // the calling goroutine. CloseAsync() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(timeout time.Duration) error }
Async is a type that reads Benthos messages from an external source and allows acknowledgements for a message batch to be propagated asynchronously.
type AsyncAckFn ¶ added in v4.1.0
AsyncAckFn is a function used to acknowledge receipt of a message batch. The provided response indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.
type AsyncCutOff ¶ added in v4.1.0
type AsyncCutOff struct {
// contains filtered or unexported fields
}
AsyncCutOff is a wrapper for input.Async implementations that exits from WaitForClose immediately. This is only useful when the underlying readable resource cannot be closed reliably and can block forever.
func NewAsyncCutOff ¶ added in v4.1.0
func NewAsyncCutOff(r Async) *AsyncCutOff
NewAsyncCutOff returns a new AsyncCutOff wrapper around a input.Async.
func (*AsyncCutOff) CloseAsync ¶ added in v4.1.0
func (c *AsyncCutOff) CloseAsync()
CloseAsync triggers the asynchronous closing of the reader.
func (*AsyncCutOff) ConnectWithContext ¶ added in v4.1.0
func (c *AsyncCutOff) ConnectWithContext(ctx context.Context) error
ConnectWithContext attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.
func (*AsyncCutOff) ReadWithContext ¶ added in v4.1.0
func (c *AsyncCutOff) ReadWithContext(ctx context.Context) (*message.Batch, AsyncAckFn, error)
ReadWithContext attempts to read a new message from the source.
func (*AsyncCutOff) WaitForClose ¶ added in v4.1.0
func (c *AsyncCutOff) WaitForClose(tout time.Duration) error
WaitForClose blocks until either the reader is finished closing or a timeout occurs.
type AsyncPreserver ¶ added in v4.1.0
type AsyncPreserver struct {
// contains filtered or unexported fields
}
AsyncPreserver is a wrapper for input.Async implementations that keeps a buffer of sent messages until they are acknowledged. If an error occurs during message propagation the contents of the buffer will be resent instead of reading new messages until it is depleted. AsyncPreserver implements input.Async.
Wrapping an input with this type is useful when your source of messages doesn't have a concept of a NoAck (like Kafka), and instead of "rejecting" messages we always intend to simply retry them until success.
func NewAsyncPreserver ¶ added in v4.1.0
func NewAsyncPreserver(r Async) *AsyncPreserver
NewAsyncPreserver returns a new AsyncPreserver wrapper around a input.Async.
func (*AsyncPreserver) CloseAsync ¶ added in v4.1.0
func (p *AsyncPreserver) CloseAsync()
CloseAsync triggers the asynchronous closing of the reader.
func (*AsyncPreserver) ConnectWithContext ¶ added in v4.1.0
func (p *AsyncPreserver) ConnectWithContext(ctx context.Context) error
ConnectWithContext attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.
func (*AsyncPreserver) ReadWithContext ¶ added in v4.1.0
func (p *AsyncPreserver) ReadWithContext(ctx context.Context) (*message.Batch, AsyncAckFn, error)
ReadWithContext attempts to read a new message from the source.
func (*AsyncPreserver) WaitForClose ¶ added in v4.1.0
func (p *AsyncPreserver) WaitForClose(tout time.Duration) error
WaitForClose blocks until either the reader is finished closing or a timeout occurs.
type AsyncReader ¶ added in v4.1.0
type AsyncReader struct {
// contains filtered or unexported fields
}
AsyncReader is an input implementation that reads messages from an input.Async component.
func (*AsyncReader) CloseAsync ¶ added in v4.1.0
func (r *AsyncReader) CloseAsync()
CloseAsync shuts down the AsyncReader input and stops processing requests.
func (*AsyncReader) Connected ¶ added in v4.1.0
func (r *AsyncReader) Connected() bool
Connected returns a boolean indicating whether this input is currently connected to its target.
func (*AsyncReader) TransactionChan ¶ added in v4.1.0
func (r *AsyncReader) TransactionChan() <-chan message.Transaction
TransactionChan returns a transactions channel for consuming messages from this input type.
func (*AsyncReader) WaitForClose ¶ added in v4.1.0
func (r *AsyncReader) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the AsyncReader input has closed down.
type AzureBlobStorageConfig ¶ added in v4.1.0
type AzureBlobStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageSASToken string `json:"storage_sas_token" yaml:"storage_sas_token"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` Container string `json:"container" yaml:"container"` Prefix string `json:"prefix" yaml:"prefix"` Codec string `json:"codec" yaml:"codec"` DeleteObjects bool `json:"delete_objects" yaml:"delete_objects"` }
AzureBlobStorageConfig contains configuration fields for the AzureBlobStorage input type.
func NewAzureBlobStorageConfig ¶ added in v4.1.0
func NewAzureBlobStorageConfig() AzureBlobStorageConfig
NewAzureBlobStorageConfig creates a new AzureBlobStorageConfig with default values.
type AzureQueueStorageConfig ¶ added in v4.1.0
type AzureQueueStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageSASToken string `json:"storage_sas_token" yaml:"storage_sas_token"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` QueueName string `json:"queue_name" yaml:"queue_name"` DequeueVisibilityTimeout string `json:"dequeue_visibility_timeout" yaml:"dequeue_visibility_timeout"` MaxInFlight int32 `json:"max_in_flight" yaml:"max_in_flight"` TrackProperties bool `json:"track_properties" yaml:"track_properties"` }
AzureQueueStorageConfig contains configuration fields for the AzureQueueStorage input type.
func NewAzureQueueStorageConfig ¶ added in v4.1.0
func NewAzureQueueStorageConfig() AzureQueueStorageConfig
NewAzureQueueStorageConfig creates a new AzureQueueStorageConfig with default values.
type BrokerConfig ¶ added in v4.1.0
type BrokerConfig struct { Copies int `json:"copies" yaml:"copies"` Inputs []Config `json:"inputs" yaml:"inputs"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
BrokerConfig contains configuration fields for the Broker input type.
func NewBrokerConfig ¶ added in v4.1.0
func NewBrokerConfig() BrokerConfig
NewBrokerConfig creates a new BrokerConfig with default values.
type CSVFileConfig ¶ added in v4.1.0
type CSVFileConfig struct { Paths []string `json:"paths" yaml:"paths"` ParseHeaderRow bool `json:"parse_header_row" yaml:"parse_header_row"` Delim string `json:"delimiter" yaml:"delimiter"` LazyQuotes bool `json:"lazy_quotes" yaml:"lazy_quotes"` BatchCount int `json:"batch_count" yaml:"batch_count"` }
CSVFileConfig contains configuration values for the CSVFile input type.
func NewCSVFileConfig ¶ added in v4.1.0
func NewCSVFileConfig() CSVFileConfig
NewCSVFileConfig creates a new CSVFileConfig with default values.
type Config ¶ added in v4.1.0
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` AMQP09 AMQP09Config `json:"amqp_0_9" yaml:"amqp_0_9"` AMQP1 AMQP1Config `json:"amqp_1" yaml:"amqp_1"` AWSKinesis AWSKinesisConfig `json:"aws_kinesis" yaml:"aws_kinesis"` AWSS3 AWSS3Config `json:"aws_s3" yaml:"aws_s3"` AWSSQS AWSSQSConfig `json:"aws_sqs" yaml:"aws_sqs"` AzureBlobStorage AzureBlobStorageConfig `json:"azure_blob_storage" yaml:"azure_blob_storage"` AzureQueueStorage AzureQueueStorageConfig `json:"azure_queue_storage" yaml:"azure_queue_storage"` Broker BrokerConfig `json:"broker" yaml:"broker"` CSVFile CSVFileConfig `json:"csv" yaml:"csv"` Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"` File FileConfig `json:"file" yaml:"file"` GCPCloudStorage GCPCloudStorageConfig `json:"gcp_cloud_storage" yaml:"gcp_cloud_storage"` GCPPubSub GCPPubSubConfig `json:"gcp_pubsub" yaml:"gcp_pubsub"` Generate GenerateConfig `json:"generate" yaml:"generate"` HDFS HDFSConfig `json:"hdfs" yaml:"hdfs"` HTTPClient HTTPClientConfig `json:"http_client" yaml:"http_client"` HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"` Inproc InprocConfig `json:"inproc" yaml:"inproc"` Kafka KafkaConfig `json:"kafka" yaml:"kafka"` MQTT MQTTConfig `json:"mqtt" yaml:"mqtt"` Nanomsg NanomsgConfig `json:"nanomsg" yaml:"nanomsg"` NATS NATSConfig `json:"nats" yaml:"nats"` NATSStream NATSStreamConfig `json:"nats_stream" yaml:"nats_stream"` NSQ NSQConfig `json:"nsq" yaml:"nsq"` Plugin interface{} `json:"plugin,omitempty" yaml:"plugin,omitempty"` ReadUntil ReadUntilConfig `json:"read_until" yaml:"read_until"` RedisList RedisListConfig `json:"redis_list" yaml:"redis_list"` RedisPubSub RedisPubSubConfig `json:"redis_pubsub" yaml:"redis_pubsub"` RedisStreams RedisStreamsConfig `json:"redis_streams" yaml:"redis_streams"` Resource string `json:"resource" yaml:"resource"` Sequence SequenceConfig `json:"sequence" yaml:"sequence"` SFTP SFTPConfig `json:"sftp" yaml:"sftp"` Socket SocketConfig `json:"socket" yaml:"socket"` SocketServer SocketServerConfig `json:"socket_server" yaml:"socket_server"` STDIN STDINConfig `json:"stdin" yaml:"stdin"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Websocket WebsocketConfig `json:"websocket" yaml:"websocket"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is the all encompassing configuration struct for all input types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
type DynamicConfig ¶ added in v4.1.0
type DynamicConfig struct { Inputs map[string]Config `json:"inputs" yaml:"inputs"` Prefix string `json:"prefix" yaml:"prefix"` }
DynamicConfig contains configuration for the Dynamic input type.
func NewDynamicConfig ¶ added in v4.1.0
func NewDynamicConfig() DynamicConfig
NewDynamicConfig creates a new DynamicConfig with default values.
type DynamoDBCheckpointConfig ¶ added in v4.1.0
type DynamoDBCheckpointConfig struct { Table string `json:"table" yaml:"table"` Create bool `json:"create" yaml:"create"` ReadCapacityUnits int64 `json:"read_capacity_units" yaml:"read_capacity_units"` WriteCapacityUnits int64 `json:"write_capacity_units" yaml:"write_capacity_units"` BillingMode string `json:"billing_mode" yaml:"billing_mode"` }
DynamoDBCheckpointConfig contains configuration parameters for a DynamoDB based checkpoint store for Kinesis.
func NewDynamoDBCheckpointConfig ¶ added in v4.1.0
func NewDynamoDBCheckpointConfig() DynamoDBCheckpointConfig
NewDynamoDBCheckpointConfig returns a DynamoDBCheckpoint config struct with default values.
type FileConfig ¶ added in v4.1.0
type FileConfig struct { Paths []string `json:"paths" yaml:"paths"` Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` DeleteOnFinish bool `json:"delete_on_finish" yaml:"delete_on_finish"` }
FileConfig contains configuration values for the File input type.
func NewFileConfig ¶ added in v4.1.0
func NewFileConfig() FileConfig
NewFileConfig creates a new FileConfig with default values.
type GCPCloudStorageConfig ¶ added in v4.1.0
type GCPCloudStorageConfig struct { Bucket string `json:"bucket" yaml:"bucket"` Prefix string `json:"prefix" yaml:"prefix"` Codec string `json:"codec" yaml:"codec"` DeleteObjects bool `json:"delete_objects" yaml:"delete_objects"` }
GCPCloudStorageConfig contains configuration fields for the Google Cloud Storage input type.
func NewGCPCloudStorageConfig ¶ added in v4.1.0
func NewGCPCloudStorageConfig() GCPCloudStorageConfig
NewGCPCloudStorageConfig creates a new GCPCloudStorageConfig with default values.
type GCPPubSubConfig ¶ added in v4.1.0
type GCPPubSubConfig struct { ProjectID string `json:"project" yaml:"project"` SubscriptionID string `json:"subscription" yaml:"subscription"` MaxOutstandingMessages int `json:"max_outstanding_messages" yaml:"max_outstanding_messages"` MaxOutstandingBytes int `json:"max_outstanding_bytes" yaml:"max_outstanding_bytes"` Sync bool `json:"sync" yaml:"sync"` }
GCPPubSubConfig contains configuration values for the input type.
func NewGCPPubSubConfig ¶ added in v4.1.0
func NewGCPPubSubConfig() GCPPubSubConfig
NewGCPPubSubConfig creates a new Config with default values.
type GenerateConfig ¶ added in v4.1.0
type GenerateConfig struct { Mapping string `json:"mapping" yaml:"mapping"` // internal can be both duration string or cron expression Interval string `json:"interval" yaml:"interval"` Count int `json:"count" yaml:"count"` }
GenerateConfig contains configuration for the Bloblang input type.
func NewGenerateConfig ¶ added in v4.1.0
func NewGenerateConfig() GenerateConfig
NewGenerateConfig creates a new BloblangConfig with default values.
type HDFSConfig ¶ added in v4.1.0
type HDFSConfig struct { Hosts []string `json:"hosts" yaml:"hosts"` User string `json:"user" yaml:"user"` Directory string `json:"directory" yaml:"directory"` }
HDFSConfig contains configuration fields for the HDFS input type.
func NewHDFSConfig ¶ added in v4.1.0
func NewHDFSConfig() HDFSConfig
NewHDFSConfig creates a new Config with default values.
type HTTPClientConfig ¶ added in v4.1.0
type HTTPClientConfig struct { ihttpdocs.Config `json:",inline" yaml:",inline"` Payload string `json:"payload" yaml:"payload"` DropEmptyBodies bool `json:"drop_empty_bodies" yaml:"drop_empty_bodies"` Stream StreamConfig `json:"stream" yaml:"stream"` }
HTTPClientConfig contains configuration for the HTTPClient output type.
func NewHTTPClientConfig ¶ added in v4.1.0
func NewHTTPClientConfig() HTTPClientConfig
NewHTTPClientConfig creates a new HTTPClientConfig with default values.
type HTTPServerConfig ¶ added in v4.1.0
type HTTPServerConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` WSPath string `json:"ws_path" yaml:"ws_path"` WSWelcomeMessage string `json:"ws_welcome_message" yaml:"ws_welcome_message"` WSRateLimitMessage string `json:"ws_rate_limit_message" yaml:"ws_rate_limit_message"` AllowedVerbs []string `json:"allowed_verbs" yaml:"allowed_verbs"` Timeout string `json:"timeout" yaml:"timeout"` RateLimit string `json:"rate_limit" yaml:"rate_limit"` CertFile string `json:"cert_file" yaml:"cert_file"` KeyFile string `json:"key_file" yaml:"key_file"` CORS httpdocs.ServerCORS `json:"cors" yaml:"cors"` Response HTTPServerResponseConfig `json:"sync_response" yaml:"sync_response"` }
HTTPServerConfig contains configuration for the HTTPServer input type.
func NewHTTPServerConfig ¶ added in v4.1.0
func NewHTTPServerConfig() HTTPServerConfig
NewHTTPServerConfig creates a new HTTPServerConfig with default values.
type HTTPServerResponseConfig ¶ added in v4.1.0
type HTTPServerResponseConfig struct { Status string `json:"status" yaml:"status"` Headers map[string]string `json:"headers" yaml:"headers"` ExtractMetadata imetadata.IncludeFilterConfig `json:"metadata_headers" yaml:"metadata_headers"` }
HTTPServerResponseConfig provides config fields for customising the response given from successful requests.
func NewHTTPServerResponseConfig ¶ added in v4.1.0
func NewHTTPServerResponseConfig() HTTPServerResponseConfig
NewHTTPServerResponseConfig creates a new HTTPServerConfig with default values.
type InprocConfig ¶ added in v4.1.0
type InprocConfig string
InprocConfig is a configuration type for the inproc input.
func NewInprocConfig ¶ added in v4.1.0
func NewInprocConfig() InprocConfig
NewInprocConfig creates a new inproc input config.
type KafkaBalancedGroupConfig ¶ added in v4.1.0
type KafkaBalancedGroupConfig struct { SessionTimeout string `json:"session_timeout" yaml:"session_timeout"` HeartbeatInterval string `json:"heartbeat_interval" yaml:"heartbeat_interval"` RebalanceTimeout string `json:"rebalance_timeout" yaml:"rebalance_timeout"` }
KafkaBalancedGroupConfig contains config fields for Kafka consumer groups.
func NewKafkaBalancedGroupConfig ¶ added in v4.1.0
func NewKafkaBalancedGroupConfig() KafkaBalancedGroupConfig
NewKafkaBalancedGroupConfig returns a KafkaBalancedGroupConfig with default values.
type KafkaConfig ¶ added in v4.1.0
type KafkaConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` Topics []string `json:"topics" yaml:"topics"` ClientID string `json:"client_id" yaml:"client_id"` RackID string `json:"rack_id" yaml:"rack_id"` ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` Group KafkaBalancedGroupConfig `json:"group" yaml:"group"` CommitPeriod string `json:"commit_period" yaml:"commit_period"` CheckpointLimit int `json:"checkpoint_limit" yaml:"checkpoint_limit"` ExtractTracingMap string `json:"extract_tracing_map" yaml:"extract_tracing_map"` MaxProcessingPeriod string `json:"max_processing_period" yaml:"max_processing_period"` FetchBufferCap int `json:"fetch_buffer_cap" yaml:"fetch_buffer_cap"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` TargetVersion string `json:"target_version" yaml:"target_version"` TLS btls.Config `json:"tls" yaml:"tls"` SASL sasl.Config `json:"sasl" yaml:"sasl"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
KafkaConfig contains configuration fields for the Kafka input type.
func NewKafkaConfig ¶ added in v4.1.0
func NewKafkaConfig() KafkaConfig
NewKafkaConfig creates a new KafkaConfig with default values.
type MQTTConfig ¶ added in v4.1.0
type MQTTConfig struct { URLs []string `json:"urls" yaml:"urls"` QoS uint8 `json:"qos" yaml:"qos"` Topics []string `json:"topics" yaml:"topics"` ClientID string `json:"client_id" yaml:"client_id"` DynamicClientIDSuffix string `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"` Will mqttconf.Will `json:"will" yaml:"will"` CleanSession bool `json:"clean_session" yaml:"clean_session"` User string `json:"user" yaml:"user"` Password string `json:"password" yaml:"password"` ConnectTimeout string `json:"connect_timeout" yaml:"connect_timeout"` KeepAlive int64 `json:"keepalive" yaml:"keepalive"` TLS tls.Config `json:"tls" yaml:"tls"` }
MQTTConfig contains configuration fields for the MQTT input type.
func NewMQTTConfig ¶ added in v4.1.0
func NewMQTTConfig() MQTTConfig
NewMQTTConfig creates a new MQTTConfig with default values.
type NATSConfig ¶ added in v4.1.0
type NATSConfig struct { URLs []string `json:"urls" yaml:"urls"` Subject string `json:"subject" yaml:"subject"` QueueID string `json:"queue" yaml:"queue"` PrefetchCount int `json:"prefetch_count" yaml:"prefetch_count"` TLS btls.Config `json:"tls" yaml:"tls"` Auth auth.Config `json:"auth" yaml:"auth"` }
NATSConfig contains configuration fields for the NATS input type.
func NewNATSConfig ¶ added in v4.1.0
func NewNATSConfig() NATSConfig
NewNATSConfig creates a new NATSConfig with default values.
type NATSStreamConfig ¶ added in v4.1.0
type NATSStreamConfig struct { URLs []string `json:"urls" yaml:"urls"` ClusterID string `json:"cluster_id" yaml:"cluster_id"` ClientID string `json:"client_id" yaml:"client_id"` QueueID string `json:"queue" yaml:"queue"` DurableName string `json:"durable_name" yaml:"durable_name"` UnsubOnClose bool `json:"unsubscribe_on_close" yaml:"unsubscribe_on_close"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` Subject string `json:"subject" yaml:"subject"` MaxInflight int `json:"max_inflight" yaml:"max_inflight"` AckWait string `json:"ack_wait" yaml:"ack_wait"` TLS btls.Config `json:"tls" yaml:"tls"` Auth auth.Config `json:"auth" yaml:"auth"` }
NATSStreamConfig contains configuration fields for the NATSStream input type.
func NewNATSStreamConfig ¶ added in v4.1.0
func NewNATSStreamConfig() NATSStreamConfig
NewNATSStreamConfig creates a new NATSStreamConfig with default values.
type NSQConfig ¶ added in v4.1.0
type NSQConfig struct { Addresses []string `json:"nsqd_tcp_addresses" yaml:"nsqd_tcp_addresses"` LookupAddresses []string `json:"lookupd_http_addresses" yaml:"lookupd_http_addresses"` Topic string `json:"topic" yaml:"topic"` Channel string `json:"channel" yaml:"channel"` UserAgent string `json:"user_agent" yaml:"user_agent"` TLS btls.Config `json:"tls" yaml:"tls"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
NSQConfig contains configuration fields for the NSQ input type.
func NewNSQConfig ¶ added in v4.1.0
func NewNSQConfig() NSQConfig
NewNSQConfig creates a new NSQConfig with default values.
type NanomsgConfig ¶ added in v4.1.0
type NanomsgConfig struct { URLs []string `json:"urls" yaml:"urls"` Bind bool `json:"bind" yaml:"bind"` SocketType string `json:"socket_type" yaml:"socket_type"` SubFilters []string `json:"sub_filters" yaml:"sub_filters"` PollTimeout string `json:"poll_timeout" yaml:"poll_timeout"` }
NanomsgConfig contains configuration fields for the nanomsg input type.
func NewNanomsgConfig ¶ added in v4.1.0
func NewNanomsgConfig() NanomsgConfig
NewNanomsgConfig creates a new NanomsgConfig with default values.
type ReadUntilConfig ¶ added in v4.1.0
type ReadUntilConfig struct { Input *Config `json:"input" yaml:"input"` Restart bool `json:"restart_input" yaml:"restart_input"` Check string `json:"check" yaml:"check"` }
ReadUntilConfig contains configuration values for the ReadUntil input type.
func NewReadUntilConfig ¶ added in v4.1.0
func NewReadUntilConfig() ReadUntilConfig
NewReadUntilConfig creates a new ReadUntilConfig with default values.
func (ReadUntilConfig) MarshalJSON ¶ added in v4.1.0
func (r ReadUntilConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (ReadUntilConfig) MarshalYAML ¶ added in v4.1.0
func (r ReadUntilConfig) MarshalYAML() (interface{}, error)
MarshalYAML prints an empty object instead of nil.
type RedisListConfig ¶ added in v4.1.0
type RedisListConfig struct { bredis.Config `json:",inline" yaml:",inline"` Key string `json:"key" yaml:"key"` Timeout string `json:"timeout" yaml:"timeout"` }
RedisListConfig contains configuration fields for the RedisList input type.
func NewRedisListConfig ¶ added in v4.1.0
func NewRedisListConfig() RedisListConfig
NewRedisListConfig creates a new RedisListConfig with default values.
type RedisPubSubConfig ¶ added in v4.1.0
type RedisPubSubConfig struct { bredis.Config `json:",inline" yaml:",inline"` Channels []string `json:"channels" yaml:"channels"` UsePatterns bool `json:"use_patterns" yaml:"use_patterns"` }
RedisPubSubConfig contains configuration fields for the RedisPubSub input type.
func NewRedisPubSubConfig ¶ added in v4.1.0
func NewRedisPubSubConfig() RedisPubSubConfig
NewRedisPubSubConfig creates a new RedisPubSubConfig with default values.
type RedisStreamsConfig ¶ added in v4.1.0
type RedisStreamsConfig struct { bredis.Config `json:",inline" yaml:",inline"` BodyKey string `json:"body_key" yaml:"body_key"` Streams []string `json:"streams" yaml:"streams"` CreateStreams bool `json:"create_streams" yaml:"create_streams"` ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` ClientID string `json:"client_id" yaml:"client_id"` Limit int64 `json:"limit" yaml:"limit"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` CommitPeriod string `json:"commit_period" yaml:"commit_period"` Timeout string `json:"timeout" yaml:"timeout"` }
RedisStreamsConfig contains configuration fields for the RedisStreams input type.
func NewRedisStreamsConfig ¶ added in v4.1.0
func NewRedisStreamsConfig() RedisStreamsConfig
NewRedisStreamsConfig creates a new RedisStreamsConfig with default values.
type SFTPConfig ¶ added in v4.1.0
type SFTPConfig struct { Address string `json:"address" yaml:"address"` Credentials sftpSetup.Credentials `json:"credentials" yaml:"credentials"` Paths []string `json:"paths" yaml:"paths"` Codec string `json:"codec" yaml:"codec"` DeleteOnFinish bool `json:"delete_on_finish" yaml:"delete_on_finish"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` Watcher watcherConfig `json:"watcher" yaml:"watcher"` }
SFTPConfig contains configuration fields for the SFTP input type.
func NewSFTPConfig ¶ added in v4.1.0
func NewSFTPConfig() SFTPConfig
NewSFTPConfig creates a new SFTPConfig with default values.
type STDINConfig ¶ added in v4.1.0
type STDINConfig struct { Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
STDINConfig contains config fields for the STDIN input type.
func NewSTDINConfig ¶ added in v4.1.0
func NewSTDINConfig() STDINConfig
NewSTDINConfig creates a STDINConfig populated with default values.
type SequenceConfig ¶ added in v4.1.0
type SequenceConfig struct { ShardedJoin SequenceShardedJoinConfig `json:"sharded_join" yaml:"sharded_join"` Inputs []Config `json:"inputs" yaml:"inputs"` }
SequenceConfig contains configuration values for the Sequence input type.
func NewSequenceConfig ¶ added in v4.1.0
func NewSequenceConfig() SequenceConfig
NewSequenceConfig creates a new SequenceConfig with default values.
type SequenceShardedJoinConfig ¶ added in v4.1.0
type SequenceShardedJoinConfig struct { Type string `json:"type" yaml:"type"` IDPath string `json:"id_path" yaml:"id_path"` Iterations int `json:"iterations" yaml:"iterations"` MergeStrategy string `json:"merge_strategy" yaml:"merge_strategy"` }
SequenceShardedJoinConfig describes an optional mechanism for performing sharded joins of structured data resulting from the input sequence. This is a way to merge the structured fields of fragmented datasets within memory even when the overall size of the data surpasses the memory available on the machine.
When configured the sequence of inputs will be consumed multiple times according to the number of iterations, and each iteration will process an entirely different set of messages by sharding them by the ID field.
Each message must be structured (JSON or otherwise processed into a structured form) and the fields will be aggregated with those of other messages sharing the ID. At the end of each iteration the joined messages are flushed downstream before the next iteration begins.
func NewSequenceShardedJoinConfig ¶ added in v4.1.0
func NewSequenceShardedJoinConfig() SequenceShardedJoinConfig
NewSequenceShardedJoinConfig creates a new sequence sharding configuration with default values.
type SocketConfig ¶ added in v4.1.0
type SocketConfig struct { Network string `json:"network" yaml:"network"` Address string `json:"address" yaml:"address"` Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
SocketConfig contains configuration values for the Socket input type.
func NewSocketConfig ¶ added in v4.1.0
func NewSocketConfig() SocketConfig
NewSocketConfig creates a new SocketConfig with default values.
type SocketServerConfig ¶ added in v4.1.0
type SocketServerConfig struct { Network string `json:"network" yaml:"network"` Address string `json:"address" yaml:"address"` Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
SocketServerConfig contains configuration for the SocketServer input type.
func NewSocketServerConfig ¶ added in v4.1.0
func NewSocketServerConfig() SocketServerConfig
NewSocketServerConfig creates a new SocketServerConfig with default values.
type StreamConfig ¶ added in v4.1.0
type StreamConfig struct { Enabled bool `json:"enabled" yaml:"enabled"` Reconnect bool `json:"reconnect" yaml:"reconnect"` Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
StreamConfig contains fields for specifying consumption behaviour when the body of a request is a constant stream of bytes.
type Streamed ¶
type Streamed interface { // TransactionChan returns a channel used for consuming transactions from // this type. Every transaction received must be resolved before another // transaction will be sent. TransactionChan() <-chan message.Transaction // Connected returns a boolean indicating whether this input is currently // connected to its target. Connected() bool // CloseAsync triggers the shut down of this component but should not block // the calling goroutine. CloseAsync() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(timeout time.Duration) error }
Streamed is a common interface implemented by inputs and provides channel based streaming APIs.
func NewAsyncReader ¶ added in v4.1.0
func NewAsyncReader( typeStr string, allowSkipAcks bool, r Async, log log.Modular, stats metrics.Type, ) (Streamed, error)
NewAsyncReader creates a new AsyncReader input type.
func WrapWithPipelines ¶ added in v4.1.0
func WrapWithPipelines(in Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)
WrapWithPipelines wraps an input with a variadic number of pipelines.
type SubprocessConfig ¶ added in v4.1.0
type SubprocessConfig struct { Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` Codec string `json:"codec" yaml:"codec"` RestartOnExit bool `json:"restart_on_exit" yaml:"restart_on_exit"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
SubprocessConfig contains configuration for the Subprocess input type.
func NewSubprocessConfig ¶ added in v4.1.0
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig creates a new SubprocessConfig with default values.
type WebsocketConfig ¶ added in v4.1.0
type WebsocketConfig struct { URL string `json:"url" yaml:"url"` OpenMsg string `json:"open_message" yaml:"open_message"` auth.Config `json:",inline" yaml:",inline"` TLS btls.Config `json:"tls" yaml:"tls"` }
WebsocketConfig contains configuration fields for the Websocket input type.
func NewWebsocketConfig ¶ added in v4.1.0
func NewWebsocketConfig() WebsocketConfig
NewWebsocketConfig creates a new WebsocketConfig with default values.
type WithPipeline ¶ added in v4.1.0
type WithPipeline struct {
// contains filtered or unexported fields
}
WithPipeline is a type that wraps both an input type and a pipeline type by routing the input through the pipeline, and implements the input.Type interface in order to act like an ordinary input.
func WrapWithPipeline ¶ added in v4.1.0
func WrapWithPipeline(in Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)
WrapWithPipeline routes an input directly into a processing pipeline and returns a type that manages both and acts like an ordinary input.
func (*WithPipeline) CloseAsync ¶ added in v4.1.0
func (i *WithPipeline) CloseAsync()
CloseAsync triggers a closure of this object but does not block.
func (*WithPipeline) Connected ¶ added in v4.1.0
func (i *WithPipeline) Connected() bool
Connected returns a boolean indicating whether this input is currently connected to its target.
func (*WithPipeline) TransactionChan ¶ added in v4.1.0
func (i *WithPipeline) TransactionChan() <-chan message.Transaction
TransactionChan returns the channel used for consuming transactions from this input.
func (*WithPipeline) WaitForClose ¶ added in v4.1.0
func (i *WithPipeline) WaitForClose(timeout time.Duration) error
WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.
Source Files ¶
- async_cut_off.go
- async_preserver.go
- async_reader.go
- config.go
- config_amqp_0_9.go
- config_amqp_1.go
- config_aws_kinesis.go
- config_aws_s3.go
- config_aws_sqs.go
- config_azure_blob_storage.go
- config_azure_queue_storage.go
- config_broker.go
- config_csv.go
- config_dynamic.go
- config_file.go
- config_gcp_cloud_storage.go
- config_gcp_pubsub.go
- config_generate.go
- config_hdfs.go
- config_http_client.go
- config_http_server.go
- config_inproc.go
- config_kafka.go
- config_mqtt.go
- config_nanomsg.go
- config_nats.go
- config_nats_stream.go
- config_nsq.go
- config_read_until.go
- config_redis_list.go
- config_redis_pubsub.go
- config_redis_streams.go
- config_sequence.go
- config_sftp.go
- config_socket.go
- config_socket_server.go
- config_stdin.go
- config_subprocess.go
- config_websocket.go
- interface.go
- wrap_with_pipeline.go