reader

package
v0.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2018 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package reader defines implementations of an interface for generic message reading from various third party sources.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AmazonS3

type AmazonS3 struct {
	// contains filtered or unexported fields
}

AmazonS3 is a benthos reader.Type implementation that reads messages from an Amazon S3 bucket.

func NewAmazonS3

func NewAmazonS3(
	conf AmazonS3Config,
	log log.Modular,
	stats metrics.Type,
) *AmazonS3

NewAmazonS3 creates a new Amazon S3 bucket reader.Type.

func (*AmazonS3) Acknowledge

func (a *AmazonS3) Acknowledge(err error) error

Acknowledge confirms whether or not our unacknowledged messages have been successfully propagated or not.

func (*AmazonS3) CloseAsync

func (a *AmazonS3) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*AmazonS3) Connect

func (a *AmazonS3) Connect() error

Connect attempts to establish a connection to the target S3 bucket and any relevant queues used to traverse the objects (SQS, etc).

func (*AmazonS3) Read

func (a *AmazonS3) Read() (types.Message, error)

Read attempts to read a new message from the target S3 bucket.

func (*AmazonS3) WaitForClose

func (a *AmazonS3) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

type AmazonS3Config

type AmazonS3Config struct {
	Region        string                    `json:"region" yaml:"region"`
	Bucket        string                    `json:"bucket" yaml:"bucket"`
	DeleteObjects bool                      `json:"delete_objects" yaml:"delete_objects"`
	SQSURL        string                    `json:"sqs_url" yaml:"sqs_url"`
	SQSBodyPath   string                    `json:"sqs_body_path" yaml:"sqs_body_path"`
	Credentials   AmazonS3CredentialsConfig `json:"credentials" yaml:"credentials"`
	TimeoutS      int64                     `json:"timeout_s" yaml:"timeout_s"`
}

AmazonS3Config is configuration values for the input type.

func NewAmazonS3Config

func NewAmazonS3Config() AmazonS3Config

NewAmazonS3Config creates a new Config with default values.

type AmazonS3CredentialsConfig

type AmazonS3CredentialsConfig struct {
	ID     string `json:"id"`
	Secret string `json:"secret"`
	Token  string `json:"token"`
}

AmazonS3CredentialsConfig contains configuration params for AWS credentials.

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka is an input type that reads from a Kafka instance.

func NewKafka

func NewKafka(
	conf KafkaConfig, log log.Modular, stats metrics.Type,
) (*Kafka, error)

NewKafka creates a new Kafka input type.

func (*Kafka) Acknowledge

func (k *Kafka) Acknowledge(err error) error

Acknowledge instructs whether the current offset should be committed.

func (*Kafka) CloseAsync

func (k *Kafka) CloseAsync()

CloseAsync shuts down the Kafka input and stops processing requests.

func (*Kafka) Connect

func (k *Kafka) Connect() error

Connect establishes a Kafka connection.

func (*Kafka) Read

func (k *Kafka) Read() (types.Message, error)

Read attempts to read a message from a Kafka topic.

func (*Kafka) WaitForClose

func (k *Kafka) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Kafka input has closed down.

type KafkaConfig

type KafkaConfig struct {
	Addresses       []string `json:"addresses" yaml:"addresses"`
	ClientID        string   `json:"client_id" yaml:"client_id"`
	ConsumerGroup   string   `json:"consumer_group" yaml:"consumer_group"`
	Topic           string   `json:"topic" yaml:"topic"`
	Partition       int32    `json:"partition" yam:"partition"`
	StartFromOldest bool     `json:"start_from_oldest" yaml:"start_from_oldest"`
}

KafkaConfig is configuration for the Kafka input type.

func NewKafkaConfig

func NewKafkaConfig() KafkaConfig

NewKafkaConfig creates a new KafkaConfig with default values.

type Preserver

type Preserver struct {
	// contains filtered or unexported fields
}

Preserver is a wrapper for reader.Type implementations that keeps a buffer of sent messages until they are acknowledged. If an error occurs during message propagation the contents of the buffer will be resent instead of reading new messages until it is depleted. Preserver implements reader.Type.

func NewPreserver

func NewPreserver(r Type) *Preserver

NewPreserver returns a new Preserver wrapper around a reader.Type.

func (*Preserver) Acknowledge

func (p *Preserver) Acknowledge(err error) error

Acknowledge instructs whether messages read since the last Acknowledge call were successfully propagated. If the error is nil this will be forwarded to the underlying wrapped reader. If a non-nil error is returned the buffer of messages will be resent.

func (*Preserver) CloseAsync

func (p *Preserver) CloseAsync()

CloseAsync triggers the asynchronous closing of the reader.

func (*Preserver) Connect

func (p *Preserver) Connect() error

Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.

func (*Preserver) Read

func (p *Preserver) Read() (types.Message, error)

Read attempts to read a new message from the source.

func (*Preserver) WaitForClose

func (p *Preserver) WaitForClose(tout time.Duration) error

WaitForClose blocks until either the reader is finished closing or a timeout occurs.

type RedisList

type RedisList struct {
	// contains filtered or unexported fields
}

RedisList is an input type that reads Redis Pub/Sub messages.

func NewRedisList

func NewRedisList(
	conf RedisListConfig, log log.Modular, stats metrics.Type,
) (*RedisList, error)

NewRedisList creates a new RedisList input type.

func (*RedisList) Acknowledge

func (r *RedisList) Acknowledge(err error) error

Acknowledge instructs whether messages have been successfully propagated.

func (*RedisList) CloseAsync

func (r *RedisList) CloseAsync()

CloseAsync shuts down the RedisList input and stops processing requests.

func (*RedisList) Connect

func (r *RedisList) Connect() error

Connect establishes a connection to an RedisList server.

func (*RedisList) Read

func (r *RedisList) Read() (types.Message, error)

Read attempts to pop a message from a redis list.

func (*RedisList) WaitForClose

func (r *RedisList) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the RedisList input has closed down.

type RedisListConfig

type RedisListConfig struct {
	URL       string `json:"url" yaml:"url"`
	Key       string `json:"key" yaml:"key"`
	TimeoutMS int    `json:"timeout_ms" yaml:"timeout_ms"`
}

RedisListConfig is configuration for the RedisList input type.

func NewRedisListConfig

func NewRedisListConfig() RedisListConfig

NewRedisListConfig creates a new RedisListConfig with default values.

type Type

type Type interface {
	// Connect attempts to establish a connection to the source, if unsuccessful
	// returns an error. If the attempt is successful (or not necessary) returns
	// nil.
	Connect() error

	// Acknowledge, if applicable to the source, should send acknowledgments for
	// (or commit) all unacknowledged (or uncommitted) messages that have thus
	// far been consumed. If the error is non-nil this means the message was
	// unsuccessfully propagated down the pipeline, in which case it is up to
	// the implementation to decide whether to simply retry uncommitted messages
	// that are buffered locally, or to send the error upstream.
	Acknowledge(err error) error

	// Read attempts to read a new message from the source.
	Read() (types.Message, error)

	types.Closable
}

Type is a type that writes Benthos messages to a third party sink.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL