Documentation ¶
Index ¶
- func AsyncReaderWithConnBackOff(boff backoff.BackOff) func(a *AsyncReader)
- type Async
- type AsyncAckFn
- type AsyncCutOff
- type AsyncPreserver
- type AsyncReader
- type AzureBlobStorageConfig
- type AzureQueueStorageConfig
- type AzureTableStorageConfig
- type BrokerConfig
- type CSVFileConfig
- type Config
- type DynamicConfig
- type FileConfig
- type GCPCloudStorageConfig
- type GCPPubSubConfig
- type GCPPubSubSubscriptionConfig
- type GenerateConfig
- type HDFSConfig
- type HTTPServerConfig
- type HTTPServerResponseConfig
- type InprocConfig
- type KafkaBalancedGroupConfig
- type KafkaConfig
- type MQTTConfig
- type NATSStreamConfig
- type NSQConfig
- type NanomsgConfig
- type ReadUntilConfig
- type RedisPubSubConfig
- type RedisStreamsConfig
- type SFTPConfig
- type STDINConfig
- type SequenceConfig
- type SequenceShardedJoinConfig
- type SocketConfig
- type SocketServerConfig
- type SocketServerTLSConfig
- type Streamed
- type SubprocessConfig
- type WithPipeline
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AsyncReaderWithConnBackOff ¶ added in v4.16.0
func AsyncReaderWithConnBackOff(boff backoff.BackOff) func(a *AsyncReader)
AsyncReaderWithConnBackOff set the backoff used for limiting connection attempts. If the maximum number of retry attempts is reached then the input will gracefully stop.
Types ¶
type Async ¶ added in v4.1.0
type Async interface { // Connect attempts to establish a connection to the source, if // unsuccessful returns an error. If the attempt is successful (or not // necessary) returns nil. Connect(ctx context.Context) error // ReadBatch attempts to read a new message from the source. If // successful a message is returned along with a function used to // acknowledge receipt of the returned message. It's safe to process the // returned message and read the next message asynchronously. ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error) // Close triggers the shut down of this component and blocks until // completion or context cancellation. Close(ctx context.Context) error }
Async is a type that reads Benthos messages from an external source and allows acknowledgements for a message batch to be propagated asynchronously.
type AsyncAckFn ¶ added in v4.1.0
AsyncAckFn is a function used to acknowledge receipt of a message batch. The provided response indicates whether the message batch was successfully delivered. Returns an error if the acknowledge was not propagated.
type AsyncCutOff ¶ added in v4.1.0
type AsyncCutOff struct {
// contains filtered or unexported fields
}
AsyncCutOff is a wrapper for input.Async implementations that exits from WaitForClose immediately. This is only useful when the underlying readable resource cannot be closed reliably and can block forever.
func NewAsyncCutOff ¶ added in v4.1.0
func NewAsyncCutOff(r Async) *AsyncCutOff
NewAsyncCutOff returns a new AsyncCutOff wrapper around a input.Async.
func (*AsyncCutOff) Close ¶ added in v4.6.0
func (c *AsyncCutOff) Close(ctx context.Context) error
Close triggers the asynchronous closing of the reader.
func (*AsyncCutOff) Connect ¶ added in v4.6.0
func (c *AsyncCutOff) Connect(ctx context.Context) error
Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.
func (*AsyncCutOff) ReadBatch ¶ added in v4.6.0
func (c *AsyncCutOff) ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error)
ReadBatch attempts to read a new message from the source.
type AsyncPreserver ¶ added in v4.1.0
type AsyncPreserver struct {
// contains filtered or unexported fields
}
AsyncPreserver is a wrapper for input.Async implementations that keeps a buffer of sent messages until they are acknowledged. If an error occurs during message propagation the contents of the buffer will be resent instead of reading new messages until it is depleted. AsyncPreserver implements input.Async.
Wrapping an input with this type is useful when your source of messages doesn't have a concept of a NoAck (like Kafka), and instead of "rejecting" messages we always intend to simply retry them until success.
func NewAsyncPreserver ¶ added in v4.1.0
func NewAsyncPreserver(r Async) *AsyncPreserver
NewAsyncPreserver returns a new AsyncPreserver wrapper around a input.Async.
func (*AsyncPreserver) Close ¶ added in v4.6.0
func (p *AsyncPreserver) Close(ctx context.Context) error
Close triggers the shut down of this component and blocks until completion or context cancellation.
func (*AsyncPreserver) Connect ¶ added in v4.6.0
func (p *AsyncPreserver) Connect(ctx context.Context) error
Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.
func (*AsyncPreserver) ReadBatch ¶ added in v4.6.0
func (p *AsyncPreserver) ReadBatch(ctx context.Context) (message.Batch, AsyncAckFn, error)
ReadBatch attempts to read a new message from the source.
type AsyncReader ¶ added in v4.1.0
type AsyncReader struct {
// contains filtered or unexported fields
}
AsyncReader is an input implementation that reads messages from an input.Async component.
func (*AsyncReader) Connected ¶ added in v4.1.0
func (r *AsyncReader) Connected() bool
Connected returns a boolean indicating whether this input is currently connected to its target.
func (*AsyncReader) TransactionChan ¶ added in v4.1.0
func (r *AsyncReader) TransactionChan() <-chan message.Transaction
TransactionChan returns a transactions channel for consuming messages from this input type.
func (*AsyncReader) TriggerCloseNow ¶ added in v4.6.0
func (r *AsyncReader) TriggerCloseNow()
TriggerCloseNow triggers the shut down of this component but should not block the calling goroutine.
func (*AsyncReader) TriggerStopConsuming ¶ added in v4.6.0
func (r *AsyncReader) TriggerStopConsuming()
TriggerStopConsuming instructs the input to start shutting down resources once all pending messages are delivered and acknowledged. This call does not block.
func (*AsyncReader) WaitForClose ¶ added in v4.1.0
func (r *AsyncReader) WaitForClose(ctx context.Context) error
WaitForClose is a blocking call to wait until the component has finished shutting down and cleaning up resources.
type AzureBlobStorageConfig ¶ added in v4.1.0
type AzureBlobStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageSASToken string `json:"storage_sas_token" yaml:"storage_sas_token"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` Container string `json:"container" yaml:"container"` Prefix string `json:"prefix" yaml:"prefix"` Codec string `json:"codec" yaml:"codec"` DeleteObjects bool `json:"delete_objects" yaml:"delete_objects"` }
AzureBlobStorageConfig contains configuration fields for the AzureBlobStorage input type.
func NewAzureBlobStorageConfig ¶ added in v4.1.0
func NewAzureBlobStorageConfig() AzureBlobStorageConfig
NewAzureBlobStorageConfig creates a new AzureBlobStorageConfig with default values.
type AzureQueueStorageConfig ¶ added in v4.1.0
type AzureQueueStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageSASToken string `json:"storage_sas_token" yaml:"storage_sas_token"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` QueueName string `json:"queue_name" yaml:"queue_name"` DequeueVisibilityTimeout string `json:"dequeue_visibility_timeout" yaml:"dequeue_visibility_timeout"` MaxInFlight int32 `json:"max_in_flight" yaml:"max_in_flight"` TrackProperties bool `json:"track_properties" yaml:"track_properties"` }
AzureQueueStorageConfig contains configuration fields for the AzureQueueStorage input type.
func NewAzureQueueStorageConfig ¶ added in v4.1.0
func NewAzureQueueStorageConfig() AzureQueueStorageConfig
NewAzureQueueStorageConfig creates a new AzureQueueStorageConfig with default values.
type AzureTableStorageConfig ¶ added in v4.10.0
type AzureTableStorageConfig struct { StorageAccount string `json:"storage_account" yaml:"storage_account"` StorageAccessKey string `json:"storage_access_key" yaml:"storage_access_key"` StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"` TableName string `json:"table_name" yaml:"table_name"` Filter string `json:"filter" yaml:"filter"` Select string `json:"select" yaml:"select"` PageSize int32 `json:"page_size" yaml:"page_size"` }
AzureTableStorageConfig contains configuration fields for the AzureTableStorage input type.
func NewAzureTableStorageConfig ¶ added in v4.10.0
func NewAzureTableStorageConfig() AzureTableStorageConfig
NewAzureTableStorageConfig creates a new AzureBlobStorageConfig with default values.
type BrokerConfig ¶ added in v4.1.0
type BrokerConfig struct { Copies int `json:"copies" yaml:"copies"` Inputs []Config `json:"inputs" yaml:"inputs"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
BrokerConfig contains configuration fields for the Broker input type.
func NewBrokerConfig ¶ added in v4.1.0
func NewBrokerConfig() BrokerConfig
NewBrokerConfig creates a new BrokerConfig with default values.
type CSVFileConfig ¶ added in v4.1.0
type CSVFileConfig struct { Paths []string `json:"paths" yaml:"paths"` ParseHeaderRow bool `json:"parse_header_row" yaml:"parse_header_row"` Delim string `json:"delimiter" yaml:"delimiter"` LazyQuotes bool `json:"lazy_quotes" yaml:"lazy_quotes"` BatchCount int `json:"batch_count" yaml:"batch_count"` DeleteOnFinish bool `json:"delete_on_finish" yaml:"delete_on_finish"` }
CSVFileConfig contains configuration values for the CSVFile input type.
func NewCSVFileConfig ¶ added in v4.1.0
func NewCSVFileConfig() CSVFileConfig
NewCSVFileConfig creates a new CSVFileConfig with default values.
type Config ¶ added in v4.1.0
type Config struct { Label string `json:"label" yaml:"label"` Type string `json:"type" yaml:"type"` AzureBlobStorage AzureBlobStorageConfig `json:"azure_blob_storage" yaml:"azure_blob_storage"` AzureQueueStorage AzureQueueStorageConfig `json:"azure_queue_storage" yaml:"azure_queue_storage"` AzureTableStorage AzureTableStorageConfig `json:"azure_table_storage" yaml:"azure_table_storage"` Broker BrokerConfig `json:"broker" yaml:"broker"` CSVFile CSVFileConfig `json:"csv" yaml:"csv"` Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"` File FileConfig `json:"file" yaml:"file"` GCPCloudStorage GCPCloudStorageConfig `json:"gcp_cloud_storage" yaml:"gcp_cloud_storage"` GCPPubSub GCPPubSubConfig `json:"gcp_pubsub" yaml:"gcp_pubsub"` Generate GenerateConfig `json:"generate" yaml:"generate"` HDFS HDFSConfig `json:"hdfs" yaml:"hdfs"` HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"` Inproc InprocConfig `json:"inproc" yaml:"inproc"` Kafka KafkaConfig `json:"kafka" yaml:"kafka"` MQTT MQTTConfig `json:"mqtt" yaml:"mqtt"` Nanomsg NanomsgConfig `json:"nanomsg" yaml:"nanomsg"` NATSStream NATSStreamConfig `json:"nats_stream" yaml:"nats_stream"` NSQ NSQConfig `json:"nsq" yaml:"nsq"` Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"` ReadUntil ReadUntilConfig `json:"read_until" yaml:"read_until"` RedisPubSub RedisPubSubConfig `json:"redis_pubsub" yaml:"redis_pubsub"` RedisStreams RedisStreamsConfig `json:"redis_streams" yaml:"redis_streams"` Resource string `json:"resource" yaml:"resource"` Sequence SequenceConfig `json:"sequence" yaml:"sequence"` SFTP SFTPConfig `json:"sftp" yaml:"sftp"` Socket SocketConfig `json:"socket" yaml:"socket"` SocketServer SocketServerConfig `json:"socket_server" yaml:"socket_server"` STDIN STDINConfig `json:"stdin" yaml:"stdin"` Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"` Processors []processor.Config `json:"processors" yaml:"processors"` }
Config is the all encompassing configuration struct for all input types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.
type DynamicConfig ¶ added in v4.1.0
type DynamicConfig struct { Inputs map[string]Config `json:"inputs" yaml:"inputs"` Prefix string `json:"prefix" yaml:"prefix"` }
DynamicConfig contains configuration for the Dynamic input type.
func NewDynamicConfig ¶ added in v4.1.0
func NewDynamicConfig() DynamicConfig
NewDynamicConfig creates a new DynamicConfig with default values.
type FileConfig ¶ added in v4.1.0
type FileConfig struct { Paths []string `json:"paths" yaml:"paths"` Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` DeleteOnFinish bool `json:"delete_on_finish" yaml:"delete_on_finish"` }
FileConfig contains configuration values for the File input type.
func NewFileConfig ¶ added in v4.1.0
func NewFileConfig() FileConfig
NewFileConfig creates a new FileConfig with default values.
type GCPCloudStorageConfig ¶ added in v4.1.0
type GCPCloudStorageConfig struct { Bucket string `json:"bucket" yaml:"bucket"` Prefix string `json:"prefix" yaml:"prefix"` Codec string `json:"codec" yaml:"codec"` DeleteObjects bool `json:"delete_objects" yaml:"delete_objects"` }
GCPCloudStorageConfig contains configuration fields for the Google Cloud Storage input type.
func NewGCPCloudStorageConfig ¶ added in v4.1.0
func NewGCPCloudStorageConfig() GCPCloudStorageConfig
NewGCPCloudStorageConfig creates a new GCPCloudStorageConfig with default values.
type GCPPubSubConfig ¶ added in v4.1.0
type GCPPubSubConfig struct { ProjectID string `json:"project" yaml:"project"` SubscriptionID string `json:"subscription" yaml:"subscription"` Endpoint string `json:"endpoint" yaml:"endpoint"` MaxOutstandingMessages int `json:"max_outstanding_messages" yaml:"max_outstanding_messages"` MaxOutstandingBytes int `json:"max_outstanding_bytes" yaml:"max_outstanding_bytes"` Sync bool `json:"sync" yaml:"sync"` CreateSubscription GCPPubSubSubscriptionConfig `json:"create_subscription" yaml:"create_subscription"` }
GCPPubSubConfig contains configuration values for the input type.
func NewGCPPubSubConfig ¶ added in v4.1.0
func NewGCPPubSubConfig() GCPPubSubConfig
NewGCPPubSubConfig creates a new Config with default values.
type GCPPubSubSubscriptionConfig ¶ added in v4.11.0
type GCPPubSubSubscriptionConfig struct { Enabled bool `json:"enabled" yaml:"enabled"` TopicID string `json:"topic" yaml:"topic"` }
GCPPubSubSubscriptionConfig contains config values for subscription creation.
type GenerateConfig ¶ added in v4.1.0
type GenerateConfig struct { Mapping string `json:"mapping" yaml:"mapping"` // internal can be both duration string or cron expression Interval string `json:"interval" yaml:"interval"` Count int `json:"count" yaml:"count"` BatchSize int `json:"batch_size" yaml:"batch_size"` }
GenerateConfig contains configuration for the Bloblang input type.
func NewGenerateConfig ¶ added in v4.1.0
func NewGenerateConfig() GenerateConfig
NewGenerateConfig creates a new BloblangConfig with default values.
type HDFSConfig ¶ added in v4.1.0
type HDFSConfig struct { Hosts []string `json:"hosts" yaml:"hosts"` User string `json:"user" yaml:"user"` Directory string `json:"directory" yaml:"directory"` }
HDFSConfig contains configuration fields for the HDFS input type.
func NewHDFSConfig ¶ added in v4.1.0
func NewHDFSConfig() HDFSConfig
NewHDFSConfig creates a new Config with default values.
type HTTPServerConfig ¶ added in v4.1.0
type HTTPServerConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` WSPath string `json:"ws_path" yaml:"ws_path"` WSWelcomeMessage string `json:"ws_welcome_message" yaml:"ws_welcome_message"` WSRateLimitMessage string `json:"ws_rate_limit_message" yaml:"ws_rate_limit_message"` AllowedVerbs []string `json:"allowed_verbs" yaml:"allowed_verbs"` Timeout string `json:"timeout" yaml:"timeout"` RateLimit string `json:"rate_limit" yaml:"rate_limit"` CertFile string `json:"cert_file" yaml:"cert_file"` KeyFile string `json:"key_file" yaml:"key_file"` CORS httpserver.CORSConfig `json:"cors" yaml:"cors"` Response HTTPServerResponseConfig `json:"sync_response" yaml:"sync_response"` }
HTTPServerConfig contains configuration for the HTTPServer input type.
func NewHTTPServerConfig ¶ added in v4.1.0
func NewHTTPServerConfig() HTTPServerConfig
NewHTTPServerConfig creates a new HTTPServerConfig with default values.
type HTTPServerResponseConfig ¶ added in v4.1.0
type HTTPServerResponseConfig struct { Status string `json:"status" yaml:"status"` Headers map[string]string `json:"headers" yaml:"headers"` ExtractMetadata metadata.IncludeFilterConfig `json:"metadata_headers" yaml:"metadata_headers"` }
HTTPServerResponseConfig provides config fields for customising the response given from successful requests.
func NewHTTPServerResponseConfig ¶ added in v4.1.0
func NewHTTPServerResponseConfig() HTTPServerResponseConfig
NewHTTPServerResponseConfig creates a new HTTPServerConfig with default values.
type InprocConfig ¶ added in v4.1.0
type InprocConfig string
InprocConfig is a configuration type for the inproc input.
func NewInprocConfig ¶ added in v4.1.0
func NewInprocConfig() InprocConfig
NewInprocConfig creates a new inproc input config.
type KafkaBalancedGroupConfig ¶ added in v4.1.0
type KafkaBalancedGroupConfig struct { SessionTimeout string `json:"session_timeout" yaml:"session_timeout"` HeartbeatInterval string `json:"heartbeat_interval" yaml:"heartbeat_interval"` RebalanceTimeout string `json:"rebalance_timeout" yaml:"rebalance_timeout"` }
KafkaBalancedGroupConfig contains config fields for Kafka consumer groups.
func NewKafkaBalancedGroupConfig ¶ added in v4.1.0
func NewKafkaBalancedGroupConfig() KafkaBalancedGroupConfig
NewKafkaBalancedGroupConfig returns a KafkaBalancedGroupConfig with default values.
type KafkaConfig ¶ added in v4.1.0
type KafkaConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` Topics []string `json:"topics" yaml:"topics"` ClientID string `json:"client_id" yaml:"client_id"` RackID string `json:"rack_id" yaml:"rack_id"` ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` Group KafkaBalancedGroupConfig `json:"group" yaml:"group"` CommitPeriod string `json:"commit_period" yaml:"commit_period"` CheckpointLimit int `json:"checkpoint_limit" yaml:"checkpoint_limit"` ExtractTracingMap string `json:"extract_tracing_map" yaml:"extract_tracing_map"` MaxProcessingPeriod string `json:"max_processing_period" yaml:"max_processing_period"` FetchBufferCap int `json:"fetch_buffer_cap" yaml:"fetch_buffer_cap"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` TargetVersion string `json:"target_version" yaml:"target_version"` TLS btls.Config `json:"tls" yaml:"tls"` SASL sasl.Config `json:"sasl" yaml:"sasl"` MultiHeader bool `json:"multi_header" yaml:"multi_header"` Batching batchconfig.Config `json:"batching" yaml:"batching"` }
KafkaConfig contains configuration fields for the Kafka input type.
func NewKafkaConfig ¶ added in v4.1.0
func NewKafkaConfig() KafkaConfig
NewKafkaConfig creates a new KafkaConfig with default values.
type MQTTConfig ¶ added in v4.1.0
type MQTTConfig struct { URLs []string `json:"urls" yaml:"urls"` QoS uint8 `json:"qos" yaml:"qos"` Topics []string `json:"topics" yaml:"topics"` ClientID string `json:"client_id" yaml:"client_id"` DynamicClientIDSuffix string `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"` Will mqttconf.Will `json:"will" yaml:"will"` CleanSession bool `json:"clean_session" yaml:"clean_session"` User string `json:"user" yaml:"user"` Password string `json:"password" yaml:"password"` ConnectTimeout string `json:"connect_timeout" yaml:"connect_timeout"` KeepAlive int64 `json:"keepalive" yaml:"keepalive"` TLS tls.Config `json:"tls" yaml:"tls"` }
MQTTConfig contains configuration fields for the MQTT input type.
func NewMQTTConfig ¶ added in v4.1.0
func NewMQTTConfig() MQTTConfig
NewMQTTConfig creates a new MQTTConfig with default values.
type NATSStreamConfig ¶ added in v4.1.0
type NATSStreamConfig struct { URLs []string `json:"urls" yaml:"urls"` ClusterID string `json:"cluster_id" yaml:"cluster_id"` ClientID string `json:"client_id" yaml:"client_id"` QueueID string `json:"queue" yaml:"queue"` DurableName string `json:"durable_name" yaml:"durable_name"` UnsubOnClose bool `json:"unsubscribe_on_close" yaml:"unsubscribe_on_close"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` Subject string `json:"subject" yaml:"subject"` MaxInflight int `json:"max_inflight" yaml:"max_inflight"` AckWait string `json:"ack_wait" yaml:"ack_wait"` TLS btls.Config `json:"tls" yaml:"tls"` Auth auth.Config `json:"auth" yaml:"auth"` }
NATSStreamConfig contains configuration fields for the NATSStream input type.
func NewNATSStreamConfig ¶ added in v4.1.0
func NewNATSStreamConfig() NATSStreamConfig
NewNATSStreamConfig creates a new NATSStreamConfig with default values.
type NSQConfig ¶ added in v4.1.0
type NSQConfig struct { Addresses []string `json:"nsqd_tcp_addresses" yaml:"nsqd_tcp_addresses"` LookupAddresses []string `json:"lookupd_http_addresses" yaml:"lookupd_http_addresses"` Topic string `json:"topic" yaml:"topic"` Channel string `json:"channel" yaml:"channel"` UserAgent string `json:"user_agent" yaml:"user_agent"` TLS btls.Config `json:"tls" yaml:"tls"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` MaxAttempts uint16 `json:"max_attempts" yaml:"max_attempts"` }
NSQConfig contains configuration fields for the NSQ input type.
func NewNSQConfig ¶ added in v4.1.0
func NewNSQConfig() NSQConfig
NewNSQConfig creates a new NSQConfig with default values.
type NanomsgConfig ¶ added in v4.1.0
type NanomsgConfig struct { URLs []string `json:"urls" yaml:"urls"` Bind bool `json:"bind" yaml:"bind"` SocketType string `json:"socket_type" yaml:"socket_type"` SubFilters []string `json:"sub_filters" yaml:"sub_filters"` PollTimeout string `json:"poll_timeout" yaml:"poll_timeout"` }
NanomsgConfig contains configuration fields for the nanomsg input type.
func NewNanomsgConfig ¶ added in v4.1.0
func NewNanomsgConfig() NanomsgConfig
NewNanomsgConfig creates a new NanomsgConfig with default values.
type ReadUntilConfig ¶ added in v4.1.0
type ReadUntilConfig struct { Input *Config `json:"input" yaml:"input"` Restart bool `json:"restart_input" yaml:"restart_input"` Check string `json:"check" yaml:"check"` }
ReadUntilConfig contains configuration values for the ReadUntil input type.
func NewReadUntilConfig ¶ added in v4.1.0
func NewReadUntilConfig() ReadUntilConfig
NewReadUntilConfig creates a new ReadUntilConfig with default values.
func (ReadUntilConfig) MarshalJSON ¶ added in v4.1.0
func (r ReadUntilConfig) MarshalJSON() ([]byte, error)
MarshalJSON prints an empty object instead of nil.
func (ReadUntilConfig) MarshalYAML ¶ added in v4.1.0
func (r ReadUntilConfig) MarshalYAML() (any, error)
MarshalYAML prints an empty object instead of nil.
type RedisPubSubConfig ¶ added in v4.1.0
type RedisPubSubConfig struct { bredis.Config `json:",inline" yaml:",inline"` Channels []string `json:"channels" yaml:"channels"` UsePatterns bool `json:"use_patterns" yaml:"use_patterns"` }
RedisPubSubConfig contains configuration fields for the RedisPubSub input type.
func NewRedisPubSubConfig ¶ added in v4.1.0
func NewRedisPubSubConfig() RedisPubSubConfig
NewRedisPubSubConfig creates a new RedisPubSubConfig with default values.
type RedisStreamsConfig ¶ added in v4.1.0
type RedisStreamsConfig struct { bredis.Config `json:",inline" yaml:",inline"` BodyKey string `json:"body_key" yaml:"body_key"` Streams []string `json:"streams" yaml:"streams"` CreateStreams bool `json:"create_streams" yaml:"create_streams"` ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` ClientID string `json:"client_id" yaml:"client_id"` Limit int64 `json:"limit" yaml:"limit"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` CommitPeriod string `json:"commit_period" yaml:"commit_period"` Timeout string `json:"timeout" yaml:"timeout"` }
RedisStreamsConfig contains configuration fields for the RedisStreams input type.
func NewRedisStreamsConfig ¶ added in v4.1.0
func NewRedisStreamsConfig() RedisStreamsConfig
NewRedisStreamsConfig creates a new RedisStreamsConfig with default values.
type SFTPConfig ¶ added in v4.1.0
type SFTPConfig struct { Address string `json:"address" yaml:"address"` Credentials sftpSetup.Credentials `json:"credentials" yaml:"credentials"` Paths []string `json:"paths" yaml:"paths"` Codec string `json:"codec" yaml:"codec"` DeleteOnFinish bool `json:"delete_on_finish" yaml:"delete_on_finish"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` Watcher watcherConfig `json:"watcher" yaml:"watcher"` }
SFTPConfig contains configuration fields for the SFTP input type.
func NewSFTPConfig ¶ added in v4.1.0
func NewSFTPConfig() SFTPConfig
NewSFTPConfig creates a new SFTPConfig with default values.
type STDINConfig ¶ added in v4.1.0
type STDINConfig struct { Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
STDINConfig contains config fields for the STDIN input type.
func NewSTDINConfig ¶ added in v4.1.0
func NewSTDINConfig() STDINConfig
NewSTDINConfig creates a STDINConfig populated with default values.
type SequenceConfig ¶ added in v4.1.0
type SequenceConfig struct { ShardedJoin SequenceShardedJoinConfig `json:"sharded_join" yaml:"sharded_join"` Inputs []Config `json:"inputs" yaml:"inputs"` }
SequenceConfig contains configuration values for the Sequence input type.
func NewSequenceConfig ¶ added in v4.1.0
func NewSequenceConfig() SequenceConfig
NewSequenceConfig creates a new SequenceConfig with default values.
type SequenceShardedJoinConfig ¶ added in v4.1.0
type SequenceShardedJoinConfig struct { Type string `json:"type" yaml:"type"` IDPath string `json:"id_path" yaml:"id_path"` Iterations int `json:"iterations" yaml:"iterations"` MergeStrategy string `json:"merge_strategy" yaml:"merge_strategy"` }
SequenceShardedJoinConfig describes an optional mechanism for performing sharded joins of structured data resulting from the input sequence. This is a way to merge the structured fields of fragmented datasets within memory even when the overall size of the data surpasses the memory available on the machine.
When configured the sequence of inputs will be consumed multiple times according to the number of iterations, and each iteration will process an entirely different set of messages by sharding them by the ID field.
Each message must be structured (JSON or otherwise processed into a structured form) and the fields will be aggregated with those of other messages sharing the ID. At the end of each iteration the joined messages are flushed downstream before the next iteration begins.
func NewSequenceShardedJoinConfig ¶ added in v4.1.0
func NewSequenceShardedJoinConfig() SequenceShardedJoinConfig
NewSequenceShardedJoinConfig creates a new sequence sharding configuration with default values.
type SocketConfig ¶ added in v4.1.0
type SocketConfig struct { Network string `json:"network" yaml:"network"` Address string `json:"address" yaml:"address"` Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
SocketConfig contains configuration values for the Socket input type.
func NewSocketConfig ¶ added in v4.1.0
func NewSocketConfig() SocketConfig
NewSocketConfig creates a new SocketConfig with default values.
type SocketServerConfig ¶ added in v4.1.0
type SocketServerConfig struct { Network string `json:"network" yaml:"network"` Address string `json:"address" yaml:"address"` Codec string `json:"codec" yaml:"codec"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` TLS SocketServerTLSConfig `json:"tls" yaml:"tls"` }
SocketServerConfig contains configuration for the SocketServer input type.
func NewSocketServerConfig ¶ added in v4.1.0
func NewSocketServerConfig() SocketServerConfig
NewSocketServerConfig creates a new SocketServerConfig with default values.
type SocketServerTLSConfig ¶ added in v4.12.0
type SocketServerTLSConfig struct { CertFile string `json:"cert_file" yaml:"cert_file"` KeyFile string `json:"key_file" yaml:"key_file"` SelfSigned bool `json:"self_signed" yaml:"self_signed"` }
SocketServerTLSConfig contains config for TLS.
type Streamed ¶
type Streamed interface { // TransactionChan returns a channel used for consuming transactions from // this type. Every transaction received must be resolved before another // transaction will be sent. TransactionChan() <-chan message.Transaction // Connected returns a boolean indicating whether this input is currently // connected to its target. Connected() bool // TriggerStopConsuming instructs the input to start shutting down resources // once all pending messages are delivered and acknowledged. This call does // not block. TriggerStopConsuming() // TriggerCloseNow triggers the shut down of this component but should not // block the calling goroutine. TriggerCloseNow() // WaitForClose is a blocking call to wait until the component has finished // shutting down and cleaning up resources. WaitForClose(ctx context.Context) error }
Streamed is a common interface implemented by inputs and provides channel based streaming APIs.
func NewAsyncReader ¶ added in v4.1.0
func NewAsyncReader( typeStr string, r Async, mgr component.Observability, opts ...func(a *AsyncReader), ) (Streamed, error)
NewAsyncReader creates a new AsyncReader input type.
func WrapWithPipelines ¶ added in v4.1.0
func WrapWithPipelines(in Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)
WrapWithPipelines wraps an input with a variadic number of pipelines.
type SubprocessConfig ¶ added in v4.1.0
type SubprocessConfig struct { Name string `json:"name" yaml:"name"` Args []string `json:"args" yaml:"args"` Codec string `json:"codec" yaml:"codec"` RestartOnExit bool `json:"restart_on_exit" yaml:"restart_on_exit"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
SubprocessConfig contains configuration for the Subprocess input type.
func NewSubprocessConfig ¶ added in v4.1.0
func NewSubprocessConfig() SubprocessConfig
NewSubprocessConfig creates a new SubprocessConfig with default values.
type WithPipeline ¶ added in v4.1.0
type WithPipeline struct {
// contains filtered or unexported fields
}
WithPipeline is a type that wraps both an input type and a pipeline type by routing the input through the pipeline, and implements the input.Type interface in order to act like an ordinary input.
func WrapWithPipeline ¶ added in v4.1.0
func WrapWithPipeline(in Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)
WrapWithPipeline routes an input directly into a processing pipeline and returns a type that manages both and acts like an ordinary input.
func (*WithPipeline) Connected ¶ added in v4.1.0
func (i *WithPipeline) Connected() bool
Connected returns a boolean indicating whether this input is currently connected to its target.
func (*WithPipeline) TransactionChan ¶ added in v4.1.0
func (i *WithPipeline) TransactionChan() <-chan message.Transaction
TransactionChan returns the channel used for consuming transactions from this input.
func (*WithPipeline) TriggerCloseNow ¶ added in v4.6.0
func (i *WithPipeline) TriggerCloseNow()
TriggerCloseNow triggers the shut down of this component but should not block the calling goroutine.
func (*WithPipeline) TriggerStopConsuming ¶ added in v4.6.0
func (i *WithPipeline) TriggerStopConsuming()
TriggerStopConsuming instructs the input to start shutting down resources once all pending messages are delivered and acknowledged. This call does not block.
func (*WithPipeline) WaitForClose ¶ added in v4.1.0
func (i *WithPipeline) WaitForClose(ctx context.Context) error
WaitForClose is a blocking call to wait until the component has finished shutting down and cleaning up resources.
Source Files ¶
- async_cut_off.go
- async_preserver.go
- async_reader.go
- config.go
- config_azure_blob_storage.go
- config_azure_queue_storage.go
- config_azure_table_storage.go
- config_broker.go
- config_csv.go
- config_dynamic.go
- config_file.go
- config_gcp_cloud_storage.go
- config_gcp_pubsub.go
- config_generate.go
- config_hdfs.go
- config_http_server.go
- config_inproc.go
- config_kafka.go
- config_mqtt.go
- config_nanomsg.go
- config_nats_stream.go
- config_nsq.go
- config_read_until.go
- config_redis_pubsub.go
- config_redis_streams.go
- config_sequence.go
- config_sftp.go
- config_socket.go
- config_socket_server.go
- config_stdin.go
- config_subprocess.go
- interface.go
- wrap_with_pipeline.go
Directories ¶
Path | Synopsis |
---|---|
Package config contains reusable config definitions and parsers for inputs defined via the public/service package.
|
Package config contains reusable config definitions and parsers for inputs defined via the public/service package. |