Documentation ¶
Overview ¶
Package reader defines implementations of an interface for generic message reading from various third party sources.
Index ¶
- func OptLinesSetDelimiter(delimiter string) func(r *Lines)
- func OptLinesSetMaxBuffer(maxBuffer int) func(r *Lines)
- func OptLinesSetMultipart(multipart bool) func(r *Lines)
- type AMQP
- type AMQPConfig
- type AmazonAWSCredentialsConfig
- type AmazonS3
- type AmazonS3Config
- type AmazonSQS
- type AmazonSQSConfig
- type Kafka
- type KafkaBalanced
- type KafkaBalancedConfig
- type KafkaConfig
- type Lines
- type NATS
- type NATSConfig
- type NATSStream
- type NATSStreamConfig
- type NSQ
- type NSQConfig
- type Preserver
- type RedisList
- type RedisListConfig
- type RedisPubSub
- type RedisPubSubConfig
- type ScaleProto
- type ScaleProtoConfig
- type Type
- func NewAMQP(conf AMQPConfig, log log.Modular, stats metrics.Type) (Type, error)
- func NewNATS(conf NATSConfig, log log.Modular, stats metrics.Type) (Type, error)
- func NewNATSStream(conf NATSStreamConfig, log log.Modular, stats metrics.Type) (Type, error)
- func NewNSQ(conf NSQConfig, log log.Modular, stats metrics.Type) (Type, error)
- func NewScaleProto(conf ScaleProtoConfig, log log.Modular, stats metrics.Type) (Type, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OptLinesSetDelimiter ¶ added in v0.7.7
OptLinesSetDelimiter is a option func that sets the delimiter (default '\n') used to divide lines (message parts) in the stream of data.
func OptLinesSetMaxBuffer ¶ added in v0.7.7
OptLinesSetMaxBuffer is a option func that sets the maximum size of the line parsing buffers.
func OptLinesSetMultipart ¶ added in v0.7.7
OptLinesSetMultipart is a option func that sets the boolean flag indicating whether lines should be parsed as multipart or not.
Types ¶
type AMQP ¶ added in v0.7.7
type AMQP struct {
// contains filtered or unexported fields
}
AMQP is an input type that reads messages via the AMQP 0.91 protocol.
func (*AMQP) Acknowledge ¶ added in v0.7.7
Acknowledge instructs whether unacknowledged messages have been successfully propagated.
func (*AMQP) CloseAsync ¶ added in v0.7.7
func (a *AMQP) CloseAsync()
CloseAsync shuts down the AMQP input and stops processing requests.
type AMQPConfig ¶ added in v0.7.7
type AMQPConfig struct { URL string `json:"url" yaml:"url"` Exchange string `json:"exchange" yaml:"exchange"` ExchangeType string `json:"exchange_type" yaml:"exchange_type"` Queue string `json:"queue" yaml:"queue"` BindingKey string `json:"key" yaml:"key"` ConsumerTag string `json:"consumer_tag" yaml:"consumer_tag"` PrefetchCount int `json:"prefetch_count" yaml:"prefetch_count"` PrefetchSize int `json:"prefetch_size" yaml:"prefetch_size"` }
AMQPConfig is configuration for the AMQP input type.
func NewAMQPConfig ¶ added in v0.7.7
func NewAMQPConfig() AMQPConfig
NewAMQPConfig creates a new AMQPConfig with default values.
type AmazonAWSCredentialsConfig ¶ added in v0.7.5
type AmazonAWSCredentialsConfig struct { ID string `json:"id"` Secret string `json:"secret"` Token string `json:"token"` }
AmazonAWSCredentialsConfig contains configuration params for AWS credentials.
type AmazonS3 ¶
type AmazonS3 struct {
// contains filtered or unexported fields
}
AmazonS3 is a benthos reader.Type implementation that reads messages from an Amazon S3 bucket.
func NewAmazonS3 ¶
NewAmazonS3 creates a new Amazon S3 bucket reader.Type.
func (*AmazonS3) Acknowledge ¶
Acknowledge confirms whether or not our unacknowledged messages have been successfully propagated or not.
func (*AmazonS3) CloseAsync ¶
func (a *AmazonS3) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*AmazonS3) Connect ¶
Connect attempts to establish a connection to the target S3 bucket and any relevant queues used to traverse the objects (SQS, etc).
type AmazonS3Config ¶
type AmazonS3Config struct { Region string `json:"region" yaml:"region"` Bucket string `json:"bucket" yaml:"bucket"` Prefix string `json:"prefix" yaml:"prefix"` DeleteObjects bool `json:"delete_objects" yaml:"delete_objects"` SQSURL string `json:"sqs_url" yaml:"sqs_url"` SQSBodyPath string `json:"sqs_body_path" yaml:"sqs_body_path"` Credentials AmazonAWSCredentialsConfig `json:"credentials" yaml:"credentials"` TimeoutS int64 `json:"timeout_s" yaml:"timeout_s"` }
AmazonS3Config is configuration values for the input type.
func NewAmazonS3Config ¶
func NewAmazonS3Config() AmazonS3Config
NewAmazonS3Config creates a new Config with default values.
type AmazonSQS ¶ added in v0.7.5
type AmazonSQS struct {
// contains filtered or unexported fields
}
AmazonSQS is a benthos reader.Type implementation that reads messages from an Amazon S3 bucket.
func NewAmazonSQS ¶ added in v0.7.5
NewAmazonSQS creates a new Amazon S3 bucket reader.Type.
func (*AmazonSQS) Acknowledge ¶ added in v0.7.5
Acknowledge confirms whether or not our unacknowledged messages have been successfully propagated or not.
func (*AmazonSQS) CloseAsync ¶ added in v0.7.5
func (a *AmazonSQS) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*AmazonSQS) Connect ¶ added in v0.7.5
Connect attempts to establish a connection to the target SQS queue.
type AmazonSQSConfig ¶ added in v0.7.5
type AmazonSQSConfig struct { Region string `json:"region" yaml:"region"` URL string `json:"url" yaml:"url"` Credentials AmazonAWSCredentialsConfig `json:"credentials" yaml:"credentials"` TimeoutS int64 `json:"timeout_s" yaml:"timeout_s"` }
AmazonSQSConfig is configuration values for the input type.
func NewAmazonSQSConfig ¶ added in v0.7.5
func NewAmazonSQSConfig() AmazonSQSConfig
NewAmazonSQSConfig creates a new Config with default values.
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka is an input type that reads from a Kafka instance.
func (*Kafka) Acknowledge ¶
Acknowledge instructs whether the current offset should be committed.
func (*Kafka) CloseAsync ¶
func (k *Kafka) CloseAsync()
CloseAsync shuts down the Kafka input and stops processing requests.
type KafkaBalanced ¶ added in v0.7.7
type KafkaBalanced struct {
// contains filtered or unexported fields
}
KafkaBalanced is an input type that reads from a KafkaBalanced instance.
func NewKafkaBalanced ¶ added in v0.7.7
func NewKafkaBalanced( conf KafkaBalancedConfig, log log.Modular, stats metrics.Type, ) (*KafkaBalanced, error)
NewKafkaBalanced creates a new KafkaBalanced input type.
func (*KafkaBalanced) Acknowledge ¶ added in v0.7.7
func (k *KafkaBalanced) Acknowledge(err error) error
Acknowledge instructs whether the current offset should be committed.
func (*KafkaBalanced) CloseAsync ¶ added in v0.7.7
func (k *KafkaBalanced) CloseAsync()
CloseAsync shuts down the KafkaBalanced input and stops processing requests.
func (*KafkaBalanced) Connect ¶ added in v0.7.7
func (k *KafkaBalanced) Connect() error
Connect establishes a KafkaBalanced connection.
func (*KafkaBalanced) Read ¶ added in v0.7.7
func (k *KafkaBalanced) Read() (types.Message, error)
Read attempts to read a message from a KafkaBalanced topic.
func (*KafkaBalanced) WaitForClose ¶ added in v0.7.7
func (k *KafkaBalanced) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the KafkaBalanced input has closed down.
type KafkaBalancedConfig ¶ added in v0.7.7
type KafkaBalancedConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` ClientID string `json:"client_id" yaml:"client_id"` ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` Topics []string `json:"topics" yaml:"topics"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` }
KafkaBalancedConfig is configuration for the KafkaBalanced input type.
func NewKafkaBalancedConfig ¶ added in v0.7.7
func NewKafkaBalancedConfig() KafkaBalancedConfig
NewKafkaBalancedConfig creates a new KafkaBalancedConfig with default values.
type KafkaConfig ¶
type KafkaConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` ClientID string `json:"client_id" yaml:"client_id"` ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` Topic string `json:"topic" yaml:"topic"` Partition int32 `json:"partition" yam:"partition"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` }
KafkaConfig is configuration for the Kafka input type.
func NewKafkaConfig ¶
func NewKafkaConfig() KafkaConfig
NewKafkaConfig creates a new KafkaConfig with default values.
type Lines ¶ added in v0.7.7
type Lines struct {
// contains filtered or unexported fields
}
Lines is a reader implementation that continuously reads line delimited messages from an io.Reader type.
func NewLines ¶ added in v0.7.7
func NewLines( handleCtor func() (io.Reader, error), onClose func(), options ...func(r *Lines), ) (*Lines, error)
NewLines creates a new reader input type.
Callers must provide a constructor function for the target io.Reader, which is called on start up and again each time a reader is exhausted. If the constructor is called but there is no more content to create a Reader for then the error `io.EOF` should be returned and the Lines will close.
Callers must also provide an onClose function, which will be called if the Lines has been instructed to shut down. This function should unblock any blocked Read calls.
func (*Lines) Acknowledge ¶ added in v0.7.7
Acknowledge confirms whether or not our unacknowledged messages have been successfully propagated or not.
func (*Lines) CloseAsync ¶ added in v0.7.7
func (r *Lines) CloseAsync()
CloseAsync shuts down the reader input and stops processing requests.
func (*Lines) Connect ¶ added in v0.7.7
Connect attempts to establish a new scanner for an io.Reader.
type NATS ¶ added in v0.7.7
type NATS struct {
// contains filtered or unexported fields
}
NATS is an input type that receives NATS messages.
func (*NATS) Acknowledge ¶ added in v0.7.7
Acknowledge instructs whether read messages have been successfully propagated.
func (*NATS) CloseAsync ¶ added in v0.7.7
func (n *NATS) CloseAsync()
CloseAsync shuts down the NATS input and stops processing requests.
type NATSConfig ¶ added in v0.7.7
type NATSConfig struct { URLs []string `json:"urls" yaml:"urls"` Subject string `json:"subject" yaml:"subject"` }
NATSConfig is configuration for the NATS input type.
func NewNATSConfig ¶ added in v0.7.7
func NewNATSConfig() NATSConfig
NewNATSConfig creates a new NATSConfig with default values.
type NATSStream ¶ added in v0.7.7
type NATSStream struct {
// contains filtered or unexported fields
}
NATSStream is an input type that receives NATSStream messages.
func (*NATSStream) Acknowledge ¶ added in v0.7.7
func (n *NATSStream) Acknowledge(err error) error
Acknowledge instructs whether unacknowledged messages have been successfully propagated.
func (*NATSStream) CloseAsync ¶ added in v0.7.7
func (n *NATSStream) CloseAsync()
CloseAsync shuts down the NATSStream input and stops processing requests.
func (*NATSStream) Connect ¶ added in v0.7.7
func (n *NATSStream) Connect() error
Connect attempts to establish a connection to a NATS streaming server.
func (*NATSStream) Read ¶ added in v0.7.7
func (n *NATSStream) Read() (types.Message, error)
Read attempts to read a new message from the NATS streaming server.
func (*NATSStream) WaitForClose ¶ added in v0.7.7
func (n *NATSStream) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the NATSStream input has closed down.
type NATSStreamConfig ¶ added in v0.7.7
type NATSStreamConfig struct { URLs []string `json:"urls" yaml:"urls"` ClusterID string `json:"cluster_id" yaml:"cluster_id"` ClientID string `json:"client_id" yaml:"client_id"` QueueID string `json:"queue" yaml:"queue"` DurableName string `json:"durable_name" yaml:"durable_name"` Subject string `json:"subject" yaml:"subject"` }
NATSStreamConfig is configuration for the NATSStream input type.
func NewNATSStreamConfig ¶ added in v0.7.7
func NewNATSStreamConfig() NATSStreamConfig
NewNATSStreamConfig creates a new NATSStreamConfig with default values.
type NSQ ¶ added in v0.7.7
type NSQ struct {
// contains filtered or unexported fields
}
NSQ is an input type that receives NSQ messages.
func (*NSQ) Acknowledge ¶ added in v0.7.7
Acknowledge instructs whether unacknowledged messages have been successfully propagated.
func (*NSQ) CloseAsync ¶ added in v0.7.7
func (n *NSQ) CloseAsync()
CloseAsync shuts down the NSQ input and stops processing requests.
func (*NSQ) HandleMessage ¶ added in v0.7.7
HandleMessage handles an NSQ message.
type NSQConfig ¶ added in v0.7.7
type NSQConfig struct { Addresses []string `json:"nsqd_tcp_addresses" yaml:"nsqd_tcp_addresses"` LookupAddresses []string `json:"lookupd_http_addresses" yaml:"lookupd_http_addresses"` Topic string `json:"topic" yaml:"topic"` Channel string `json:"channel" yaml:"channel"` UserAgent string `json:"user_agent" yaml:"user_agent"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
NSQConfig is configuration for the NSQ input type.
func NewNSQConfig ¶ added in v0.7.7
func NewNSQConfig() NSQConfig
NewNSQConfig creates a new NSQConfig with default values.
type Preserver ¶
type Preserver struct {
// contains filtered or unexported fields
}
Preserver is a wrapper for reader.Type implementations that keeps a buffer of sent messages until they are acknowledged. If an error occurs during message propagation the contents of the buffer will be resent instead of reading new messages until it is depleted. Preserver implements reader.Type.
func NewPreserver ¶
NewPreserver returns a new Preserver wrapper around a reader.Type.
func (*Preserver) Acknowledge ¶
Acknowledge instructs whether messages read since the last Acknowledge call were successfully propagated. If the error is nil this will be forwarded to the underlying wrapped reader. If a non-nil error is returned the buffer of messages will be resent.
func (*Preserver) CloseAsync ¶
func (p *Preserver) CloseAsync()
CloseAsync triggers the asynchronous closing of the reader.
func (*Preserver) Connect ¶
Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.
type RedisList ¶
type RedisList struct {
// contains filtered or unexported fields
}
RedisList is an input type that reads Redis Pub/Sub messages.
func NewRedisList ¶
NewRedisList creates a new RedisList input type.
func (*RedisList) Acknowledge ¶
Acknowledge instructs whether messages have been successfully propagated.
func (*RedisList) CloseAsync ¶
func (r *RedisList) CloseAsync()
CloseAsync shuts down the RedisList input and stops processing requests.
type RedisListConfig ¶
type RedisListConfig struct { URL string `json:"url" yaml:"url"` Key string `json:"key" yaml:"key"` TimeoutMS int `json:"timeout_ms" yaml:"timeout_ms"` }
RedisListConfig is configuration for the RedisList input type.
func NewRedisListConfig ¶
func NewRedisListConfig() RedisListConfig
NewRedisListConfig creates a new RedisListConfig with default values.
type RedisPubSub ¶ added in v0.7.7
type RedisPubSub struct {
// contains filtered or unexported fields
}
RedisPubSub is an input type that reads Redis Pub/Sub messages.
func NewRedisPubSub ¶ added in v0.7.7
func NewRedisPubSub( conf RedisPubSubConfig, log log.Modular, stats metrics.Type, ) (*RedisPubSub, error)
NewRedisPubSub creates a new RedisPubSub input type.
func (*RedisPubSub) Acknowledge ¶ added in v0.7.7
func (r *RedisPubSub) Acknowledge(err error) error
Acknowledge instructs whether messages have been successfully propagated.
func (*RedisPubSub) CloseAsync ¶ added in v0.7.7
func (r *RedisPubSub) CloseAsync()
CloseAsync shuts down the RedisPubSub input and stops processing requests.
func (*RedisPubSub) Connect ¶ added in v0.7.7
func (r *RedisPubSub) Connect() error
Connect establishes a connection to an RedisPubSub server.
func (*RedisPubSub) Read ¶ added in v0.7.7
func (r *RedisPubSub) Read() (types.Message, error)
Read attempts to pop a message from a redis list.
func (*RedisPubSub) WaitForClose ¶ added in v0.7.7
func (r *RedisPubSub) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the RedisPubSub input has closed down.
type RedisPubSubConfig ¶ added in v0.7.7
type RedisPubSubConfig struct { URL string `json:"url" yaml:"url"` Channels []string `json:"channels" yaml:"channels"` }
RedisPubSubConfig is configuration for the RedisPubSub input type.
func NewRedisPubSubConfig ¶ added in v0.7.7
func NewRedisPubSubConfig() RedisPubSubConfig
NewRedisPubSubConfig creates a new RedisPubSubConfig with default values.
type ScaleProto ¶ added in v0.7.7
type ScaleProto struct {
// contains filtered or unexported fields
}
ScaleProto is an input type that serves Scalability Protocols messages.
func (*ScaleProto) Acknowledge ¶ added in v0.7.7
func (s *ScaleProto) Acknowledge(err error) error
Acknowledge instructs whether the pending messages were propagated successfully.
func (*ScaleProto) CloseAsync ¶ added in v0.7.7
func (s *ScaleProto) CloseAsync()
CloseAsync shuts down the ScaleProto input and stops processing requests.
func (*ScaleProto) Connect ¶ added in v0.7.7
func (s *ScaleProto) Connect() error
Connect establishes a nanomsg socket.
func (*ScaleProto) Read ¶ added in v0.7.7
func (s *ScaleProto) Read() (types.Message, error)
Read attempts to read a new message from the nanomsg socket.
func (*ScaleProto) WaitForClose ¶ added in v0.7.7
func (s *ScaleProto) WaitForClose(timeout time.Duration) error
WaitForClose blocks until the ScaleProto input has closed down.
type ScaleProtoConfig ¶ added in v0.7.7
type ScaleProtoConfig struct { URLs []string `json:"urls" yaml:"urls"` Bind bool `json:"bind" yaml:"bind"` SocketType string `json:"socket_type" yaml:"socket_type"` SubFilters []string `json:"sub_filters" yaml:"sub_filters"` PollTimeoutMS int `json:"poll_timeout_ms" yaml:"poll_timeout_ms"` RepTimeoutMS int `json:"reply_timeout_ms" yaml:"reply_timeout_ms"` }
ScaleProtoConfig is configuration for the ScaleProto input type.
func NewScaleProtoConfig ¶ added in v0.7.7
func NewScaleProtoConfig() ScaleProtoConfig
NewScaleProtoConfig creates a new ScaleProtoConfig with default values.
type Type ¶
type Type interface { // Connect attempts to establish a connection to the source, if unsuccessful // returns an error. If the attempt is successful (or not necessary) returns // nil. Connect() error // Acknowledge, if applicable to the source, should send acknowledgments for // (or commit) all unacknowledged (or uncommitted) messages that have thus // far been consumed. If the error is non-nil this means the message was // unsuccessfully propagated down the pipeline, in which case it is up to // the implementation to decide whether to simply retry uncommitted messages // that are buffered locally, or to send the error upstream. Acknowledge(err error) error // Read attempts to read a new message from the source. Read() (types.Message, error) types.Closable }
Type is a type that writes Benthos messages to a third party sink.
func NewNATSStream ¶ added in v0.7.7
NewNATSStream creates a new NATSStream input type.
func NewScaleProto ¶ added in v0.7.7
NewScaleProto creates a new ScaleProto input type.