Documentation ¶
Index ¶
- type AMQP09BindingConfig
- type AMQP09Config
- type AMQP09QueueDeclareConfig
- type AMQP1Config
- type AWSKinesisConfig
- type AWSS3Config
- type AWSS3SQSConfig
- type AWSSQSConfig
- type Async
- type AsyncAckFn
- type AsyncCutOff
- type AsyncPreserver
- type AsyncReader
- type AzureBlobStorageConfig
- type AzureQueueStorageConfig
- type BrokerConfig
- type CSVFileConfig
- type Config
- type DynamicConfig
- type DynamoDBCheckpointConfig
- type FileConfig
- type GCPCloudStorageConfig
- type GCPPubSubConfig
- type GenerateConfig
- type HDFSConfig
- type HTTPClientConfig
- type HTTPServerConfig
- type HTTPServerResponseConfig
- type InprocConfig
- type KafkaBalancedGroupConfig
- type KafkaConfig
- type MQTTConfig
- type NATSConfig
- type NATSStreamConfig
- type NSQConfig
- type NanomsgConfig
- type ReadUntilConfig
- type RedisListConfig
- type RedisPubSubConfig
- type RedisStreamsConfig
- type SFTPConfig
- type STDINConfig
- type SequenceConfig
- type SequenceShardedJoinConfig
- type SocketConfig
- type SocketServerConfig
- type StreamConfig
- type Streamed
- type SubprocessConfig
- type WebsocketConfig
- type WithPipeline
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AMQP09BindingConfig ¶
type AMQP09BindingConfig struct { Exchange string `json:"exchange" yaml:"exchange"` RoutingKey string `json:"key" yaml:"key"` }
AMQP09BindingConfig contains fields describing a queue binding to be declared.
type AMQP09Config ¶
type AMQP09Config struct { URLs []string `json:"urls" yaml:"urls"` Queue string `json:"queue" yaml:"queue"` QueueDeclare AMQP09QueueDeclareConfig `json:"queue_declare" yaml:"queue_declare"` BindingsDeclare []AMQP09BindingConfig `json:"bindings_declare" yaml:"bindings_declare"` ConsumerTag string `json:"consumer_tag" yaml:"consumer_tag"` AutoAck bool `json:"auto_ack" yaml:"auto_ack"` NackRejectPatterns []string `json:"nack_reject_patterns" yaml:"nack_reject_patterns"` PrefetchCount int `json:"prefetch_count" yaml:"prefetch_count"` PrefetchSize int `json:"prefetch_size" yaml:"prefetch_size"` TLS btls.Config `json:"tls" yaml:"tls"` }
AMQP09Config contains configuration for the AMQP09 input type.
func NewAMQP09Config ¶
func NewAMQP09Config() AMQP09Config
NewAMQP09Config creates a new AMQP09Config with default values.
type AMQP09QueueDeclareConfig ¶
type AMQP09QueueDeclareConfig struct { Enabled bool `json:"enabled" yaml:"enabled"` Durable bool `json:"durable" yaml:"durable"` }
AMQP09QueueDeclareConfig contains fields indicating whether the target AMQP09 queue needs to be declared and bound to an exchange, as well as any fields specifying how to accomplish that.
type AMQP1Config ¶
type AMQP1Config struct { URL string `json:"url" yaml:"url"` SourceAddress string `json:"source_address" yaml:"source_address"` AzureRenewLock bool `json:"azure_renew_lock" yaml:"azure_renew_lock"` TLS btls.Config `json:"tls" yaml:"tls"` SASL shared.SASLConfig `json:"sasl" yaml:"sasl"` }
AMQP1Config contains configuration for the AMQP1 input type.
func NewAMQP1Config ¶
func NewAMQP1Config() AMQP1Config
NewAMQP1Config creates a new AMQP1Config with default values.
type AWSKinesisConfig ¶
type AWSKinesisConfig struct { session.Config `json:",inline" yaml:",inline"` Streams []string `json:"streams" yaml:"streams"` DynamoDB DynamoDBCheckpointConfig `json:"dynamodb" yaml:"dynamodb"` CheckpointLimit int `json:"checkpoint_limit" yaml:"checkpoint_limit"` CommitPeriod string `json:"commit_period" yaml:"commit_period"` LeasePeriod string `json:"lease_period" yaml:"lease_period"` RebalancePeriod string `json:"rebalance_period" yaml:"rebalance_period"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
AWSKinesisConfig is configuration values for the input type.
func NewAWSKinesisConfig ¶
func NewAWSKinesisConfig() AWSKinesisConfig
NewAWSKinesisConfig creates a new Config with default values.
type AWSS3Config ¶
type AWSS3Config struct { sess.Config `json:",inline" yaml:",inline"` Bucket string `json:"bucket" yaml:"bucket"` Codec string `json:"codec" yaml:"codec"` Prefix string `json:"prefix" yaml:"prefix"` ForcePathStyleURLs bool `json:"force_path_style_urls" yaml:"force_path_style_urls"` DeleteObjects bool `json:"delete_objects" yaml:"delete_objects"` SQS AWSS3SQSConfig `json:"sqs" yaml:"sqs"` }
AWSS3Config contains configuration values for the aws_s3 input type.
func NewAWSS3Config ¶
func NewAWSS3Config() AWSS3Config
NewAWSS3Config creates a new AWSS3Config with default values.
type AWSS3SQSConfig ¶
type AWSS3SQSConfig struct { URL string `json:"url" yaml:"url"` Endpoint string `json:"endpoint" yaml:"endpoint"` EnvelopePath string `json:"envelope_path" yaml:"envelope_path"` KeyPath string `json:"key_path" yaml:"key_path"` BucketPath string `json:"bucket_path" yaml:"bucket_path"` DelayPeriod string `json:"delay_period" yaml:"delay_period"` MaxMessages int64 `json:"max_messages" yaml:"max_messages"` }
AWSS3SQSConfig contains configuration for hooking up the S3 input with an SQS queue.
func NewAWSS3SQSConfig ¶
func NewAWSS3SQSConfig() AWSS3SQSConfig
NewAWSS3SQSConfig creates a new AWSS3SQSConfig with default values.
type AWSSQSConfig ¶
type AWSSQSConfig struct { sess.Config `json:",inline" yaml:",inline"` URL string `json:"url" yaml:"url"` DeleteMessage bool `json:"delete_message" yaml:"delete_message"` ResetVisibility bool `json:"reset_visibility" yaml:"reset_visibility"` MaxNumberOfMessages int `json:"max_number_of_messages" yaml:"max_number_of_messages"` }
AWSSQSConfig contains configuration values for the input type.
func NewAWSSQSConfig ¶
func NewAWSSQSConfig() AWSSQSConfig
NewAWSSQSConfig creates a new Config with default values.
type Async ¶
type Async interface { // ConnectWithContext attempts to establish a connection to the source, if // unsuccessful returns an error. If the attempt is successful (or not // necessary) returns nil. ConnectWithContext(ctx context.Context) error // ReadWithContext attempts to read a new message from the source. If // successful a message is returned along with a function used to // acknowledge receipt of the returned message. It's safe to process the // returned message and read the next message asynchronously. ReadWithContext(ctx context.Context) (*message.Batch, AsyncAckFn, error) // CloseAsync triggers the shut down of this component but should not block // the calling goroutine. CloseAsync() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(timeout time.Duration) error }
Async is a type that reads Benthos messages from an external source and allows acknowledgements for a message batch to be propagated asynchronously.
type AsyncAckFn ¶
AsyncAckFn is a function used to acknowledge receipt of a message batch. The provided response indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.
type AsyncCutOff ¶
type AsyncCutOff struct {
// contains filtered or unexported fields
}
AsyncCutOff is a wrapper for input.Async implementations that exits from WaitForClose immediately. This is only useful when the underlying readable resource cannot be closed reliably and can block forever.
func NewAsyncCutOff ¶
func NewAsyncCutOff(r Async) *AsyncCutOff
NewAsyncCutOff returns a new AsyncCutOff wrapper around a input.Async.
func (*AsyncCutOff) CloseAsync ¶
func (c *AsyncCutOff) CloseAsync()
CloseAsync triggers the asynchronous closing of the reader.
func (*AsyncCutOff) ConnectWithContext ¶
func (c *AsyncCutOff) ConnectWithContext(ctx context.Context) error
ConnectWithContext attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.
func (*AsyncCutOff) ReadWithContext ¶
func (c *AsyncCutOff) ReadWithContext(ctx context.Context) (*message.Batch, AsyncAckFn, error)
ReadWithContext attempts to read a new message from the source.
func (*AsyncCutOff) WaitForClose ¶
func (c *AsyncCutOff) WaitForClose(tout time.Duration) error
WaitForClose blocks until either the reader is finished closing or a timeout occurs.
type AsyncPreserver ¶
type AsyncPreserver struct {
// contains filtered or unexported fields
}
AsyncPreserver is a wrapper for input.Async implementations that keeps a buffer of sent messages until they are acknowledged. If an error occurs during message propagation the contents of the buffer will be resent instead of reading new messages until it is depleted. AsyncPreserver implements input.Async.
Wrapping an input with this type is useful when your source of messages doesn't have a concept of a NoAck (like Kafka), and instead of "rejecting" messages we always intend to simply retry them until success.
func NewAsyncPreserver ¶
func NewAsyncPreserver(r Async) *AsyncPreserver
NewAsyncPreserver returns a new AsyncPreserver wrapper around a input.Async.
func (*AsyncPreserver) CloseAsync ¶
func (p *AsyncPreserver) CloseAsync()
CloseAsync triggers the asynchronous closing of the reader.
func (*AsyncPreserver) ConnectWithContext ¶
func (p *AsyncPreserver) ConnectWithContext(ctx context.Context) error
ConnectWithContext attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.
func (*AsyncPreserver) ReadWithContext ¶
func (p *AsyncPreserver) ReadWithContext(ctx context.Context) (*message.Batch, AsyncAckFn, error)
ReadWithContext attempts to read a new message from the source.
func (*AsyncPreserver) WaitForClose ¶
func (p *AsyncPreserver) WaitForClose(tout time.Duration) error
WaitForClose blocks until either the reader is finished closing or a timeout occurs.
type AsyncReader ¶
type AsyncReader struct {
// contains filtered or unexported fields
}
AsyncReader is an input implementation that reads messages from an input.Async component.
func (*AsyncReader) CloseAsync ¶
func (r *AsyncReader) CloseAsync()
CloseAsync shuts down the AsyncReader input and stops processing requests.
func (*AsyncReader) Connected ¶
func (r *AsyncReader) Connected() bool
Connected returns a boolean indicating whether this input is currently connected to its target.
func (*AsyncReader) TransactionChan ¶
func (r *AsyncReader) TransactionChan() <-chan message.Transaction
TransactionChan returns a transactions channel for consuming messages from this input type.
func (*AsyncReader) WaitForClose ¶
func (r *AsyncReader) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the AsyncReader input has closed down.
type AzureBlobStorageConfig ¶
type AzureBlobStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageSASToken string `json:"storage_sas_token" yaml:"storage_sas_token"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` Container string `json:"container" yaml:"container"` Prefix string `json:"prefix" yaml:"prefix"` Codec string `json:"codec" yaml:"codec"` DeleteObjects bool `json:"delete_objects" yaml:"delete_objects"` }
AzureBlobStorageConfig contains configuration fields for the AzureBlobStorage input type.
func NewAzureBlobStorageConfig ¶
func NewAzureBlobStorageConfig() AzureBlobStorageConfig
NewAzureBlobStorageConfig creates a new AzureBlobStorageConfig with default values.
type AzureQueueStorageConfig ¶
type AzureQueueStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageSASToken string `json:"storage_sas_token" yaml:"storage_sas_token"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` QueueName string `json:"queue_name" yaml:"queue_name"` DequeueVisibilityTimeout string `json:"dequeue_visibility_timeout" yaml:"dequeue_visibility_timeout"` MaxInFlight int32 `json:"max_in_flight" yaml:"max_in_flight"` TrackProperties bool `json:"track_properties" yaml:"track_properties"` }
AzureQueueStorageConfig contains configuration fields for the AzureQueueStorage input type.
func NewAzureQueueStorageConfig ¶
func NewAzureQueueStorageConfig() AzureQueueStorageConfig
NewAzureQueueStorageConfig creates a new AzureQueueStorageConfig with default values.
type BrokerConfig ¶
type BrokerConfig struct { Copies int `json:"copies" yaml:"copies"` Inputs []Config `json:"inputs" yaml:"inputs"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
BrokerConfig contains configuration fields for the Broker input type.
func NewBrokerConfig ¶
func NewBrokerConfig() BrokerConfig
NewBrokerConfig creates a new BrokerConfig with default values.
type CSVFileConfig ¶
type CSVFileConfig struct { Paths []string `json:"paths" yaml:"paths"` ParseHeaderRow bool `json:"parse_header_row" yaml:"parse_header_row"` Delim string `json:"delimiter" yaml:"delimiter"` LazyQuotes bool `json:"lazy_quotes" yaml:"lazy_quotes"` BatchCount int `json:"batch_count" yaml:"batch_count"` }
CSVFileConfig contains configuration values for the CSVFile input type.
func NewCSVFileConfig ¶
func NewCSVFileConfig() CSVFileConfig
NewCSVFileConfig creates a new CSVFileConfig with default values.
type Config ¶
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` AMQP09 AMQP09Config `json:"amqp_0_9" yaml:"amqp_0_9"` AMQP1 AMQP1Config `json:"amqp_1" yaml:"amqp_1"` AWSKinesis AWSKinesisConfig `json:"aws_kinesis" yaml:"aws_kinesis"` AWSS3 AWSS3Config `json:"aws_s3" yaml:"aws_s3"` AWSSQS AWSSQSConfig `json:"aws_sqs" yaml:"aws_sqs"` AzureBlobStorage AzureBlobStorageConfig `json:"azure_blob_storage" yaml:"azure_blob_storage"` AzureQueueStorage AzureQueueStorageConfig `json:"azure_queue_storage" yaml:"azure_queue_storage"` Broker BrokerConfig `json:"broker" yaml:"broker"` CSVFile CSVFileConfig `json:"csv" yaml:"csv"` Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"` File FileConfig `json:"file" yaml:"file"` GCPCloudStorage GCPCloudStorageConfig `json:"gcp_cloud_storage" yaml:"gcp_cloud_storage"` GCPPubSub GCPPubSubConfig `json:"gcp_pubsub" yaml:"gcp_pubsub"` Generate GenerateConfig `json:"generate" yaml:"generate"` HDFS HDFSConfig `json:"hdfs" yaml:"hdfs"` HTTPClient HTTPClientConfig `json:"http_client" yaml:"http_client"` HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"` Inproc InprocConfig `json:"inproc" yaml:"inproc"` Kafka KafkaConfig `json:"kafka" yaml:"kafka"` MQTT MQTTConfig `json:"mqtt" yaml:"mqtt"` Nanomsg NanomsgConfig `json:"nanomsg" yaml:"nanomsg"` NATS NATSConfig `json:"nats" yaml:"nats"` NATSStream NATSStreamConfig `json:"nats_stream" yaml:"nats_stream"` NSQ NSQConfig `json:"nsq" yaml:"nsq"` Plugin interface{} `json:"plugin,omitempty" yaml:"plugin,omitempty"` ReadUntil ReadUntilConfig `json:"read_until" yaml:"read_until"` RedisList RedisListConfig `json:"redis_list" yaml:"redis_list"` RedisPubSub RedisPubSubConfig `json:"redis_pubsub" yaml:"redis_pubsub"` RedisStreams RedisStreamsConfig `json:"redis_streams" yaml:"redis_streams"` Resource string `json:"resource" yaml:"resource"` Sequence SequenceConfig `json:"sequence" yaml:"sequence"` SFTP SFTPConfig `json:"sftp" yaml:"sftp"` Socket SocketConfig `json:"socket" yaml:"socket"` SocketServer SocketServerConfig `json:"socket_server" yaml:"socket_server"` STDIN STDINConfig `json:"stdin" yaml:"stdin"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Websocket WebsocketConfig `json:"websocket" yaml:"websocket"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is the all encompassing configuration struct for all input types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl
type DynamicConfig ¶
type DynamicConfig struct { Inputs map[string]Config `json:"inputs" yaml:"inputs"` Prefix string `json:"prefix" yaml:"prefix"` }
DynamicConfig contains configuration for the Dynamic input type.
func NewDynamicConfig ¶
func NewDynamicConfig() DynamicConfig
NewDynamicConfig creates a new DynamicConfig with default values.
type DynamoDBCheckpointConfig ¶
type DynamoDBCheckpointConfig struct { Table string `json:"table" yaml:"table"` Create bool `json:"create" yaml:"create"` ReadCapacityUnits int64 `json:"read_capacity_units" yaml:"read_capacity_units"` WriteCapacityUnits int64 `json:"write_capacity_units" yaml:"write_capacity_units"` BillingMode string `json:"billing_mode" yaml:"billing_mode"` }
DynamoDBCheckpointConfig contains configuration parameters for a DynamoDB based checkpoint store for Kinesis.
func NewDynamoDBCheckpointConfig ¶
func NewDynamoDBCheckpointConfig() DynamoDBCheckpointConfig
NewDynamoDBCheckpointConfig returns a DynamoDBCheckpoint config struct with default values.
type FileConfig ¶
type FileConfig struct { Paths []string `json:"paths" yaml:"paths"` Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` DeleteOnFinish bool `json:"delete_on_finish" yaml:"delete_on_finish"` }
FileConfig contains configuration values for the File input type.
func NewFileConfig ¶
func NewFileConfig() FileConfig
NewFileConfig creates a new FileConfig with default values.
type GCPCloudStorageConfig ¶
type GCPCloudStorageConfig struct { Bucket string `json:"bucket" yaml:"bucket"` Prefix string `json:"prefix" yaml:"prefix"` Codec string `json:"codec" yaml:"codec"` DeleteObjects bool `json:"delete_objects" yaml:"delete_objects"` }
GCPCloudStorageConfig contains configuration fields for the Google Cloud Storage input type.
func NewGCPCloudStorageConfig ¶
func NewGCPCloudStorageConfig() GCPCloudStorageConfig
NewGCPCloudStorageConfig creates a new GCPCloudStorageConfig with default values.
type GCPPubSubConfig ¶
type GCPPubSubConfig struct { ProjectID string `json:"project" yaml:"project"` SubscriptionID string `json:"subscription" yaml:"subscription"` MaxOutstandingMessages int `json:"max_outstanding_messages" yaml:"max_outstanding_messages"` MaxOutstandingBytes int `json:"max_outstanding_bytes" yaml:"max_outstanding_bytes"` Sync bool `json:"sync" yaml:"sync"` }
GCPPubSubConfig contains configuration values for the input type.
func NewGCPPubSubConfig ¶
func NewGCPPubSubConfig() GCPPubSubConfig
NewGCPPubSubConfig creates a new Config with default values.
type GenerateConfig ¶
type GenerateConfig struct { Mapping string `json:"mapping" yaml:"mapping"` // internal can be both duration string or cron expression Interval string `json:"interval" yaml:"interval"` Count int `json:"count" yaml:"count"` }
GenerateConfig contains configuration for the Bloblang input type.
func NewGenerateConfig ¶
func NewGenerateConfig() GenerateConfig
NewGenerateConfig creates a new BloblangConfig with default values.
type HDFSConfig ¶
type HDFSConfig struct { Hosts []string `json:"hosts" yaml:"hosts"` User string `json:"user" yaml:"user"` Directory string `json:"directory" yaml:"directory"` }
HDFSConfig contains configuration fields for the HDFS input type.
func NewHDFSConfig ¶
func NewHDFSConfig() HDFSConfig
NewHDFSConfig creates a new Config with default values.
type HTTPClientConfig ¶
type HTTPClientConfig struct { ihttpdocs.Config `json:",inline" yaml:",inline"` Payload string `json:"payload" yaml:"payload"` DropEmptyBodies bool `json:"drop_empty_bodies" yaml:"drop_empty_bodies"` Stream StreamConfig `json:"stream" yaml:"stream"` }
HTTPClientConfig contains configuration for the HTTPClient output type.
func NewHTTPClientConfig ¶
func NewHTTPClientConfig() HTTPClientConfig
NewHTTPClientConfig creates a new HTTPClientConfig with default values.
type HTTPServerConfig ¶
type HTTPServerConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` WSPath string `json:"ws_path" yaml:"ws_path"` WSWelcomeMessage string `json:"ws_welcome_message" yaml:"ws_welcome_message"` WSRateLimitMessage string `json:"ws_rate_limit_message" yaml:"ws_rate_limit_message"` AllowedVerbs []string `json:"allowed_verbs" yaml:"allowed_verbs"` Timeout string `json:"timeout" yaml:"timeout"` RateLimit string `json:"rate_limit" yaml:"rate_limit"` CertFile string `json:"cert_file" yaml:"cert_file"` KeyFile string `json:"key_file" yaml:"key_file"` CORS httpdocs.ServerCORS `json:"cors" yaml:"cors"` Response HTTPServerResponseConfig `json:"sync_response" yaml:"sync_response"` }
HTTPServerConfig contains configuration for the HTTPServer input type.
func NewHTTPServerConfig ¶
func NewHTTPServerConfig() HTTPServerConfig
NewHTTPServerConfig creates a new HTTPServerConfig with default values.
type HTTPServerResponseConfig ¶
type HTTPServerResponseConfig struct { Status string `json:"status" yaml:"status"` Headers map[string]string `json:"headers" yaml:"headers"` ExtractMetadata imetadata.IncludeFilterConfig `json:"metadata_headers" yaml:"metadata_headers"` }
HTTPServerResponseConfig provides config fields for customising the response given from successful requests.
func NewHTTPServerResponseConfig ¶
func NewHTTPServerResponseConfig() HTTPServerResponseConfig
NewHTTPServerResponseConfig creates a new HTTPServerConfig with default values.
type InprocConfig ¶
type InprocConfig string
InprocConfig is a configuration type for the inproc input.
func NewInprocConfig ¶
func NewInprocConfig() InprocConfig
NewInprocConfig creates a new inproc input config.
type KafkaBalancedGroupConfig ¶
type KafkaBalancedGroupConfig struct { SessionTimeout string `json:"session_timeout" yaml:"session_timeout"` HeartbeatInterval string `json:"heartbeat_interval" yaml:"heartbeat_interval"` RebalanceTimeout string `json:"rebalance_timeout" yaml:"rebalance_timeout"` }
KafkaBalancedGroupConfig contains config fields for Kafka consumer groups.
func NewKafkaBalancedGroupConfig ¶
func NewKafkaBalancedGroupConfig() KafkaBalancedGroupConfig
NewKafkaBalancedGroupConfig returns a KafkaBalancedGroupConfig with default values.
type KafkaConfig ¶
type KafkaConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` Topics []string `json:"topics" yaml:"topics"` ClientID string `json:"client_id" yaml:"client_id"` RackID string `json:"rack_id" yaml:"rack_id"` ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` Group KafkaBalancedGroupConfig `json:"group" yaml:"group"` CommitPeriod string `json:"commit_period" yaml:"commit_period"` CheckpointLimit int `json:"checkpoint_limit" yaml:"checkpoint_limit"` ExtractTracingMap string `json:"extract_tracing_map" yaml:"extract_tracing_map"` MaxProcessingPeriod string `json:"max_processing_period" yaml:"max_processing_period"` FetchBufferCap int `json:"fetch_buffer_cap" yaml:"fetch_buffer_cap"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` TargetVersion string `json:"target_version" yaml:"target_version"` TLS btls.Config `json:"tls" yaml:"tls"` SASL sasl.Config `json:"sasl" yaml:"sasl"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
KafkaConfig contains configuration fields for the Kafka input type.
func NewKafkaConfig ¶
func NewKafkaConfig() KafkaConfig
NewKafkaConfig creates a new KafkaConfig with default values.
type MQTTConfig ¶
type MQTTConfig struct { URLs []string `json:"urls" yaml:"urls"` QoS uint8 `json:"qos" yaml:"qos"` Topics []string `json:"topics" yaml:"topics"` ClientID string `json:"client_id" yaml:"client_id"` DynamicClientIDSuffix string `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"` Will mqttconf.Will `json:"will" yaml:"will"` CleanSession bool `json:"clean_session" yaml:"clean_session"` User string `json:"user" yaml:"user"` Password string `json:"password" yaml:"password"` ConnectTimeout string `json:"connect_timeout" yaml:"connect_timeout"` KeepAlive int64 `json:"keepalive" yaml:"keepalive"` TLS tls.Config `json:"tls" yaml:"tls"` }
MQTTConfig contains configuration fields for the MQTT input type.
func NewMQTTConfig ¶
func NewMQTTConfig() MQTTConfig
NewMQTTConfig creates a new MQTTConfig with default values.
type NATSConfig ¶
type NATSConfig struct { URLs []string `json:"urls" yaml:"urls"` Subject string `json:"subject" yaml:"subject"` QueueID string `json:"queue" yaml:"queue"` PrefetchCount int `json:"prefetch_count" yaml:"prefetch_count"` TLS btls.Config `json:"tls" yaml:"tls"` Auth auth.Config `json:"auth" yaml:"auth"` }
NATSConfig contains configuration fields for the NATS input type.
func NewNATSConfig ¶
func NewNATSConfig() NATSConfig
NewNATSConfig creates a new NATSConfig with default values.
type NATSStreamConfig ¶
type NATSStreamConfig struct { URLs []string `json:"urls" yaml:"urls"` ClusterID string `json:"cluster_id" yaml:"cluster_id"` ClientID string `json:"client_id" yaml:"client_id"` QueueID string `json:"queue" yaml:"queue"` DurableName string `json:"durable_name" yaml:"durable_name"` UnsubOnClose bool `json:"unsubscribe_on_close" yaml:"unsubscribe_on_close"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` Subject string `json:"subject" yaml:"subject"` MaxInflight int `json:"max_inflight" yaml:"max_inflight"` AckWait string `json:"ack_wait" yaml:"ack_wait"` TLS btls.Config `json:"tls" yaml:"tls"` Auth auth.Config `json:"auth" yaml:"auth"` }
NATSStreamConfig contains configuration fields for the NATSStream input type.
func NewNATSStreamConfig ¶
func NewNATSStreamConfig() NATSStreamConfig
NewNATSStreamConfig creates a new NATSStreamConfig with default values.
type NSQConfig ¶
type NSQConfig struct { Addresses []string `json:"nsqd_tcp_addresses" yaml:"nsqd_tcp_addresses"` LookupAddresses []string `json:"lookupd_http_addresses" yaml:"lookupd_http_addresses"` Topic string `json:"topic" yaml:"topic"` Channel string `json:"channel" yaml:"channel"` UserAgent string `json:"user_agent" yaml:"user_agent"` TLS btls.Config `json:"tls" yaml:"tls"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
NSQConfig contains configuration fields for the NSQ input type.
func NewNSQConfig ¶
func NewNSQConfig() NSQConfig
NewNSQConfig creates a new NSQConfig with default values.
type NanomsgConfig ¶
type NanomsgConfig struct { URLs []string `json:"urls" yaml:"urls"` Bind bool `json:"bind" yaml:"bind"` SocketType string `json:"socket_type" yaml:"socket_type"` SubFilters []string `json:"sub_filters" yaml:"sub_filters"` PollTimeout string `json:"poll_timeout" yaml:"poll_timeout"` }
NanomsgConfig contains configuration fields for the nanomsg input type.
func NewNanomsgConfig ¶
func NewNanomsgConfig() NanomsgConfig
NewNanomsgConfig creates a new NanomsgConfig with default values.
type ReadUntilConfig ¶
type ReadUntilConfig struct { Input *Config `json:"input" yaml:"input"` Restart bool `json:"restart_input" yaml:"restart_input"` Check string `json:"check" yaml:"check"` }
ReadUntilConfig contains configuration values for the ReadUntil input type.
func NewReadUntilConfig ¶
func NewReadUntilConfig() ReadUntilConfig
NewReadUntilConfig creates a new ReadUntilConfig with default values.
func (ReadUntilConfig) MarshalJSON ¶
func (r ReadUntilConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (ReadUntilConfig) MarshalYAML ¶
func (r ReadUntilConfig) MarshalYAML() (interface{}, error)
MarshalYAML prints an empty object instead of nil.
type RedisListConfig ¶
type RedisListConfig struct { bredis.Config `json:",inline" yaml:",inline"` Key string `json:"key" yaml:"key"` Timeout string `json:"timeout" yaml:"timeout"` }
RedisListConfig contains configuration fields for the RedisList input type.
func NewRedisListConfig ¶
func NewRedisListConfig() RedisListConfig
NewRedisListConfig creates a new RedisListConfig with default values.
type RedisPubSubConfig ¶
type RedisPubSubConfig struct { bredis.Config `json:",inline" yaml:",inline"` Channels []string `json:"channels" yaml:"channels"` UsePatterns bool `json:"use_patterns" yaml:"use_patterns"` }
RedisPubSubConfig contains configuration fields for the RedisPubSub input type.
func NewRedisPubSubConfig ¶
func NewRedisPubSubConfig() RedisPubSubConfig
NewRedisPubSubConfig creates a new RedisPubSubConfig with default values.
type RedisStreamsConfig ¶
type RedisStreamsConfig struct { bredis.Config `json:",inline" yaml:",inline"` BodyKey string `json:"body_key" yaml:"body_key"` Streams []string `json:"streams" yaml:"streams"` CreateStreams bool `json:"create_streams" yaml:"create_streams"` ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` ClientID string `json:"client_id" yaml:"client_id"` Limit int64 `json:"limit" yaml:"limit"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` CommitPeriod string `json:"commit_period" yaml:"commit_period"` Timeout string `json:"timeout" yaml:"timeout"` }
RedisStreamsConfig contains configuration fields for the RedisStreams input type.
func NewRedisStreamsConfig ¶
func NewRedisStreamsConfig() RedisStreamsConfig
NewRedisStreamsConfig creates a new RedisStreamsConfig with default values.
type SFTPConfig ¶
type SFTPConfig struct { Address string `json:"address" yaml:"address"` Credentials sftpSetup.Credentials `json:"credentials" yaml:"credentials"` Paths []string `json:"paths" yaml:"paths"` Codec string `json:"codec" yaml:"codec"` DeleteOnFinish bool `json:"delete_on_finish" yaml:"delete_on_finish"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` Watcher watcherConfig `json:"watcher" yaml:"watcher"` }
SFTPConfig contains configuration fields for the SFTP input type.
func NewSFTPConfig ¶
func NewSFTPConfig() SFTPConfig
NewSFTPConfig creates a new SFTPConfig with default values.
type STDINConfig ¶
type STDINConfig struct { Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
STDINConfig contains config fields for the STDIN input type.
func NewSTDINConfig ¶
func NewSTDINConfig() STDINConfig
NewSTDINConfig creates a STDINConfig populated with default values.
type SequenceConfig ¶
type SequenceConfig struct { ShardedJoin SequenceShardedJoinConfig `json:"sharded_join" yaml:"sharded_join"` Inputs []Config `json:"inputs" yaml:"inputs"` }
SequenceConfig contains configuration values for the Sequence input type.
func NewSequenceConfig ¶
func NewSequenceConfig() SequenceConfig
NewSequenceConfig creates a new SequenceConfig with default values.
type SequenceShardedJoinConfig ¶
type SequenceShardedJoinConfig struct { Type string `json:"type" yaml:"type"` IDPath string `json:"id_path" yaml:"id_path"` Iterations int `json:"iterations" yaml:"iterations"` MergeStrategy string `json:"merge_strategy" yaml:"merge_strategy"` }
SequenceShardedJoinConfig describes an optional mechanism for performing sharded joins of structured data resulting from the input sequence. This is a way to merge the structured fields of fragmented datasets within memory even when the overall size of the data surpasses the memory available on the machine.
When configured the sequence of inputs will be consumed multiple times according to the number of iterations, and each iteration will process an entirely different set of messages by sharding them by the ID field.
Each message must be structured (JSON or otherwise processed into a structured form) and the fields will be aggregated with those of other messages sharing the ID. At the end of each iteration the joined messages are flushed downstream before the next iteration begins.
func NewSequenceShardedJoinConfig ¶
func NewSequenceShardedJoinConfig() SequenceShardedJoinConfig
NewSequenceShardedJoinConfig creates a new sequence sharding configuration with default values.
type SocketConfig ¶
type SocketConfig struct { Network string `json:"network" yaml:"network"` Address string `json:"address" yaml:"address"` Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
SocketConfig contains configuration values for the Socket input type.
func NewSocketConfig ¶
func NewSocketConfig() SocketConfig
NewSocketConfig creates a new SocketConfig with default values.
type SocketServerConfig ¶
type SocketServerConfig struct { Network string `json:"network" yaml:"network"` Address string `json:"address" yaml:"address"` Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
SocketServerConfig contains configuration for the SocketServer input type.
func NewSocketServerConfig ¶
func NewSocketServerConfig() SocketServerConfig
NewSocketServerConfig creates a new SocketServerConfig with default values.
type StreamConfig ¶
type StreamConfig struct { Enabled bool `json:"enabled" yaml:"enabled"` Reconnect bool `json:"reconnect" yaml:"reconnect"` Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
StreamConfig contains fields for specifying consumption behaviour when the body of a request is a constant stream of bytes.
type Streamed ¶
type Streamed interface { // TransactionChan returns a channel used for consuming transactions from // this type. Every transaction received must be resolved before another // transaction will be sent. TransactionChan() <-chan message.Transaction // Connected returns a boolean indicating whether this input is currently // connected to its target. Connected() bool // CloseAsync triggers the shut down of this component but should not block // the calling goroutine. CloseAsync() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(timeout time.Duration) error }
Streamed is a common interface implemented by inputs and provides channel based streaming APIs.
func NewAsyncReader ¶
func NewAsyncReader( typeStr string, allowSkipAcks bool, r Async, mgr component.Observability, ) (Streamed, error)
NewAsyncReader creates a new AsyncReader input type.
func WrapWithPipelines ¶
func WrapWithPipelines(in Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)
WrapWithPipelines wraps an input with a variadic number of pipelines.
type SubprocessConfig ¶
type SubprocessConfig struct { Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` Codec string `json:"codec" yaml:"codec"` RestartOnExit bool `json:"restart_on_exit" yaml:"restart_on_exit"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
SubprocessConfig contains configuration for the Subprocess input type.
func NewSubprocessConfig ¶
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig creates a new SubprocessConfig with default values.
type WebsocketConfig ¶
type WebsocketConfig struct { URL string `json:"url" yaml:"url"` OpenMsg string `json:"open_message" yaml:"open_message"` auth.Config `json:",inline" yaml:",inline"` TLS btls.Config `json:"tls" yaml:"tls"` }
WebsocketConfig contains configuration fields for the Websocket input type.
func NewWebsocketConfig ¶
func NewWebsocketConfig() WebsocketConfig
NewWebsocketConfig creates a new WebsocketConfig with default values.
type WithPipeline ¶
type WithPipeline struct {
// contains filtered or unexported fields
}
WithPipeline is a type that wraps both an input type and a pipeline type by routing the input through the pipeline, and implements the input.Type interface in order to act like an ordinary input.
func WrapWithPipeline ¶
func WrapWithPipeline(in Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)
WrapWithPipeline routes an input directly into a processing pipeline and returns a type that manages both and acts like an ordinary input.
func (*WithPipeline) CloseAsync ¶
func (i *WithPipeline) CloseAsync()
CloseAsync triggers a closure of this object but does not block.
func (*WithPipeline) Connected ¶
func (i *WithPipeline) Connected() bool
Connected returns a boolean indicating whether this input is currently connected to its target.
func (*WithPipeline) TransactionChan ¶
func (i *WithPipeline) TransactionChan() <-chan message.Transaction
TransactionChan returns the channel used for consuming transactions from this input.
func (*WithPipeline) WaitForClose ¶
func (i *WithPipeline) WaitForClose(timeout time.Duration) error
WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.
Source Files ¶
- async_cut_off.go
- async_preserver.go
- async_reader.go
- config.go
- config_amqp_0_9.go
- config_amqp_1.go
- config_aws_kinesis.go
- config_aws_s3.go
- config_aws_sqs.go
- config_azure_blob_storage.go
- config_azure_queue_storage.go
- config_broker.go
- config_csv.go
- config_dynamic.go
- config_file.go
- config_gcp_cloud_storage.go
- config_gcp_pubsub.go
- config_generate.go
- config_hdfs.go
- config_http_client.go
- config_http_server.go
- config_inproc.go
- config_kafka.go
- config_mqtt.go
- config_nanomsg.go
- config_nats.go
- config_nats_stream.go
- config_nsq.go
- config_read_until.go
- config_redis_list.go
- config_redis_pubsub.go
- config_redis_streams.go
- config_sequence.go
- config_sftp.go
- config_socket.go
- config_socket_server.go
- config_stdin.go
- config_subprocess.go
- config_websocket.go
- interface.go
- wrap_with_pipeline.go