Documentation ¶
Overview ¶
Package reader defines implementations of an interface for generic message reading from various third party sources.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AmazonS3 ¶
type AmazonS3 struct {
// contains filtered or unexported fields
}
AmazonS3 is a benthos reader.Type implementation that reads messages from an Amazon S3 bucket.
func NewAmazonS3 ¶
NewAmazonS3 creates a new Amazon S3 bucket reader.Type.
func (*AmazonS3) Acknowledge ¶
Acknowledge confirms whether or not our unacknowledged messages have been successfully propagated or not.
func (*AmazonS3) CloseAsync ¶
func (a *AmazonS3) CloseAsync()
CloseAsync begins cleaning up resources used by this reader asynchronously.
func (*AmazonS3) Connect ¶
Connect attempts to establish a connection to the target S3 bucket and any relevant queues used to traverse the objects (SQS, etc).
type AmazonS3Config ¶
type AmazonS3Config struct { Region string `json:"region" yaml:"region"` Bucket string `json:"bucket" yaml:"bucket"` DeleteObjects bool `json:"delete_objects" yaml:"delete_objects"` SQSURL string `json:"sqs_url" yaml:"sqs_url"` SQSBodyPath string `json:"sqs_body_path" yaml:"sqs_body_path"` Credentials AmazonS3CredentialsConfig `json:"credentials" yaml:"credentials"` TimeoutS int64 `json:"timeout_s" yaml:"timeout_s"` }
AmazonS3Config is configuration values for the input type.
func NewAmazonS3Config ¶
func NewAmazonS3Config() AmazonS3Config
NewAmazonS3Config creates a new Config with default values.
type AmazonS3CredentialsConfig ¶
type AmazonS3CredentialsConfig struct { ID string `json:"id"` Secret string `json:"secret"` Token string `json:"token"` }
AmazonS3CredentialsConfig contains configuration params for AWS credentials.
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka is an input type that reads from a Kafka instance.
func (*Kafka) Acknowledge ¶
Acknowledge instructs whether the current offset should be committed.
func (*Kafka) CloseAsync ¶
func (k *Kafka) CloseAsync()
CloseAsync shuts down the Kafka input and stops processing requests.
type KafkaConfig ¶
type KafkaConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` ClientID string `json:"client_id" yaml:"client_id"` ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` Topic string `json:"topic" yaml:"topic"` Partition int32 `json:"partition" yam:"partition"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` }
KafkaConfig is configuration for the Kafka input type.
func NewKafkaConfig ¶
func NewKafkaConfig() KafkaConfig
NewKafkaConfig creates a new KafkaConfig with default values.
type Preserver ¶
type Preserver struct {
// contains filtered or unexported fields
}
Preserver is a wrapper for reader.Type implementations that keeps a buffer of sent messages until they are acknowledged. If an error occurs during message propagation the contents of the buffer will be resent instead of reading new messages until it is depleted. Preserver implements reader.Type.
func NewPreserver ¶
NewPreserver returns a new Preserver wrapper around a reader.Type.
func (*Preserver) Acknowledge ¶
Acknowledge instructs whether messages read since the last Acknowledge call were successfully propagated. If the error is nil this will be forwarded to the underlying wrapped reader. If a non-nil error is returned the buffer of messages will be resent.
func (*Preserver) CloseAsync ¶
func (p *Preserver) CloseAsync()
CloseAsync triggers the asynchronous closing of the reader.
func (*Preserver) Connect ¶
Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.
type RedisList ¶
type RedisList struct {
// contains filtered or unexported fields
}
RedisList is an input type that reads Redis Pub/Sub messages.
func NewRedisList ¶
NewRedisList creates a new RedisList input type.
func (*RedisList) Acknowledge ¶
Acknowledge instructs whether messages have been successfully propagated.
func (*RedisList) CloseAsync ¶
func (r *RedisList) CloseAsync()
CloseAsync shuts down the RedisList input and stops processing requests.
type RedisListConfig ¶
type RedisListConfig struct { URL string `json:"url" yaml:"url"` Key string `json:"key" yaml:"key"` TimeoutMS int `json:"timeout_ms" yaml:"timeout_ms"` }
RedisListConfig is configuration for the RedisList input type.
func NewRedisListConfig ¶
func NewRedisListConfig() RedisListConfig
NewRedisListConfig creates a new RedisListConfig with default values.
type Type ¶
type Type interface { // Connect attempts to establish a connection to the source, if unsuccessful // returns an error. If the attempt is successful (or not necessary) returns // nil. Connect() error // Acknowledge, if applicable to the source, should send acknowledgments for // (or commit) all unacknowledged (or uncommitted) messages that have thus // far been consumed. If the error is non-nil this means the message was // unsuccessfully propagated down the pipeline, in which case it is up to // the implementation to decide whether to simply retry uncommitted messages // that are buffered locally, or to send the error upstream. Acknowledge(err error) error // Read attempts to read a new message from the source. Read() (types.Message, error) types.Closable }
Type is a type that writes Benthos messages to a third party sink.