reader

package
v0.8.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2018 License: MIT Imports: 32 Imported by: 0

Documentation

Overview

Package reader defines implementations of an interface for generic message reading from various third party sources.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OptLinesSetDelimiter

func OptLinesSetDelimiter(delimiter string) func(r *Lines)

OptLinesSetDelimiter is a option func that sets the delimiter (default '\n') used to divide lines (message parts) in the stream of data.

func OptLinesSetMaxBuffer

func OptLinesSetMaxBuffer(maxBuffer int) func(r *Lines)

OptLinesSetMaxBuffer is a option func that sets the maximum size of the line parsing buffers.

func OptLinesSetMultipart

func OptLinesSetMultipart(multipart bool) func(r *Lines)

OptLinesSetMultipart is a option func that sets the boolean flag indicating whether lines should be parsed as multipart or not.

Types

type AMQP

type AMQP struct {
	// contains filtered or unexported fields
}

AMQP is an input type that reads messages via the AMQP 0.91 protocol.

func (*AMQP) Acknowledge

func (a *AMQP) Acknowledge(err error) error

Acknowledge instructs whether unacknowledged messages have been successfully propagated.

func (*AMQP) CloseAsync

func (a *AMQP) CloseAsync()

CloseAsync shuts down the AMQP input and stops processing requests.

func (*AMQP) Connect

func (a *AMQP) Connect() (err error)

Connect establishes a connection to an AMQP server.

func (*AMQP) Read

func (a *AMQP) Read() (types.Message, error)

Read a new AMQP message.

func (*AMQP) WaitForClose

func (a *AMQP) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the AMQP input has closed down.

type AMQPConfig

type AMQPConfig struct {
	URL           string `json:"url" yaml:"url"`
	Exchange      string `json:"exchange" yaml:"exchange"`
	ExchangeType  string `json:"exchange_type" yaml:"exchange_type"`
	Queue         string `json:"queue" yaml:"queue"`
	BindingKey    string `json:"key" yaml:"key"`
	ConsumerTag   string `json:"consumer_tag" yaml:"consumer_tag"`
	PrefetchCount int    `json:"prefetch_count" yaml:"prefetch_count"`
	PrefetchSize  int    `json:"prefetch_size" yaml:"prefetch_size"`
}

AMQPConfig is configuration for the AMQP input type.

func NewAMQPConfig

func NewAMQPConfig() AMQPConfig

NewAMQPConfig creates a new AMQPConfig with default values.

type AmazonAWSCredentialsConfig

type AmazonAWSCredentialsConfig struct {
	ID     string `json:"id"`
	Secret string `json:"secret"`
	Token  string `json:"token"`
}

AmazonAWSCredentialsConfig contains configuration params for AWS credentials.

type AmazonS3

type AmazonS3 struct {
	// contains filtered or unexported fields
}

AmazonS3 is a benthos reader.Type implementation that reads messages from an Amazon S3 bucket.

func NewAmazonS3

func NewAmazonS3(
	conf AmazonS3Config,
	log log.Modular,
	stats metrics.Type,
) *AmazonS3

NewAmazonS3 creates a new Amazon S3 bucket reader.Type.

func (*AmazonS3) Acknowledge

func (a *AmazonS3) Acknowledge(err error) error

Acknowledge confirms whether or not our unacknowledged messages have been successfully propagated or not.

func (*AmazonS3) CloseAsync

func (a *AmazonS3) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*AmazonS3) Connect

func (a *AmazonS3) Connect() error

Connect attempts to establish a connection to the target S3 bucket and any relevant queues used to traverse the objects (SQS, etc).

func (*AmazonS3) Read

func (a *AmazonS3) Read() (types.Message, error)

Read attempts to read a new message from the target S3 bucket.

func (*AmazonS3) WaitForClose

func (a *AmazonS3) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

type AmazonS3Config

type AmazonS3Config struct {
	Region        string                     `json:"region" yaml:"region"`
	Bucket        string                     `json:"bucket" yaml:"bucket"`
	Prefix        string                     `json:"prefix" yaml:"prefix"`
	DeleteObjects bool                       `json:"delete_objects" yaml:"delete_objects"`
	SQSURL        string                     `json:"sqs_url" yaml:"sqs_url"`
	SQSBodyPath   string                     `json:"sqs_body_path" yaml:"sqs_body_path"`
	Credentials   AmazonAWSCredentialsConfig `json:"credentials" yaml:"credentials"`
	TimeoutS      int64                      `json:"timeout_s" yaml:"timeout_s"`
}

AmazonS3Config is configuration values for the input type.

func NewAmazonS3Config

func NewAmazonS3Config() AmazonS3Config

NewAmazonS3Config creates a new Config with default values.

type AmazonSQS

type AmazonSQS struct {
	// contains filtered or unexported fields
}

AmazonSQS is a benthos reader.Type implementation that reads messages from an Amazon S3 bucket.

func NewAmazonSQS

func NewAmazonSQS(
	conf AmazonSQSConfig,
	log log.Modular,
	stats metrics.Type,
) *AmazonSQS

NewAmazonSQS creates a new Amazon S3 bucket reader.Type.

func (*AmazonSQS) Acknowledge

func (a *AmazonSQS) Acknowledge(err error) error

Acknowledge confirms whether or not our unacknowledged messages have been successfully propagated or not.

func (*AmazonSQS) CloseAsync

func (a *AmazonSQS) CloseAsync()

CloseAsync begins cleaning up resources used by this reader asynchronously.

func (*AmazonSQS) Connect

func (a *AmazonSQS) Connect() error

Connect attempts to establish a connection to the target SQS queue.

func (*AmazonSQS) Read

func (a *AmazonSQS) Read() (types.Message, error)

Read attempts to read a new message from the target SQS.

func (*AmazonSQS) WaitForClose

func (a *AmazonSQS) WaitForClose(time.Duration) error

WaitForClose will block until either the reader is closed or a specified timeout occurs.

type AmazonSQSConfig

type AmazonSQSConfig struct {
	Region      string                     `json:"region" yaml:"region"`
	URL         string                     `json:"url" yaml:"url"`
	Credentials AmazonAWSCredentialsConfig `json:"credentials" yaml:"credentials"`
	TimeoutS    int64                      `json:"timeout_s" yaml:"timeout_s"`
}

AmazonSQSConfig is configuration values for the input type.

func NewAmazonSQSConfig

func NewAmazonSQSConfig() AmazonSQSConfig

NewAmazonSQSConfig creates a new Config with default values.

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka is an input type that reads from a Kafka instance.

func NewKafka

func NewKafka(
	conf KafkaConfig, log log.Modular, stats metrics.Type,
) (*Kafka, error)

NewKafka creates a new Kafka input type.

func (*Kafka) Acknowledge

func (k *Kafka) Acknowledge(err error) error

Acknowledge instructs whether the current offset should be committed.

func (*Kafka) CloseAsync

func (k *Kafka) CloseAsync()

CloseAsync shuts down the Kafka input and stops processing requests.

func (*Kafka) Connect

func (k *Kafka) Connect() error

Connect establishes a Kafka connection.

func (*Kafka) Read

func (k *Kafka) Read() (types.Message, error)

Read attempts to read a message from a Kafka topic.

func (*Kafka) WaitForClose

func (k *Kafka) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the Kafka input has closed down.

type KafkaBalanced

type KafkaBalanced struct {
	// contains filtered or unexported fields
}

KafkaBalanced is an input type that reads from a KafkaBalanced instance.

func NewKafkaBalanced

func NewKafkaBalanced(
	conf KafkaBalancedConfig, log log.Modular, stats metrics.Type,
) (*KafkaBalanced, error)

NewKafkaBalanced creates a new KafkaBalanced input type.

func (*KafkaBalanced) Acknowledge

func (k *KafkaBalanced) Acknowledge(err error) error

Acknowledge instructs whether the current offset should be committed.

func (*KafkaBalanced) CloseAsync

func (k *KafkaBalanced) CloseAsync()

CloseAsync shuts down the KafkaBalanced input and stops processing requests.

func (*KafkaBalanced) Connect

func (k *KafkaBalanced) Connect() error

Connect establishes a KafkaBalanced connection.

func (*KafkaBalanced) Read

func (k *KafkaBalanced) Read() (types.Message, error)

Read attempts to read a message from a KafkaBalanced topic.

func (*KafkaBalanced) WaitForClose

func (k *KafkaBalanced) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the KafkaBalanced input has closed down.

type KafkaBalancedConfig

type KafkaBalancedConfig struct {
	Addresses       []string `json:"addresses" yaml:"addresses"`
	ClientID        string   `json:"client_id" yaml:"client_id"`
	ConsumerGroup   string   `json:"consumer_group" yaml:"consumer_group"`
	Topics          []string `json:"topics" yaml:"topics"`
	StartFromOldest bool     `json:"start_from_oldest" yaml:"start_from_oldest"`
}

KafkaBalancedConfig is configuration for the KafkaBalanced input type.

func NewKafkaBalancedConfig

func NewKafkaBalancedConfig() KafkaBalancedConfig

NewKafkaBalancedConfig creates a new KafkaBalancedConfig with default values.

type KafkaConfig

type KafkaConfig struct {
	Addresses       []string `json:"addresses" yaml:"addresses"`
	ClientID        string   `json:"client_id" yaml:"client_id"`
	ConsumerGroup   string   `json:"consumer_group" yaml:"consumer_group"`
	Topic           string   `json:"topic" yaml:"topic"`
	Partition       int32    `json:"partition" yam:"partition"`
	StartFromOldest bool     `json:"start_from_oldest" yaml:"start_from_oldest"`
}

KafkaConfig is configuration for the Kafka input type.

func NewKafkaConfig

func NewKafkaConfig() KafkaConfig

NewKafkaConfig creates a new KafkaConfig with default values.

type Lines

type Lines struct {
	// contains filtered or unexported fields
}

Lines is a reader implementation that continuously reads line delimited messages from an io.Reader type.

func NewLines

func NewLines(
	handleCtor func() (io.Reader, error),
	onClose func(),
	options ...func(r *Lines),
) (*Lines, error)

NewLines creates a new reader input type.

Callers must provide a constructor function for the target io.Reader, which is called on start up and again each time a reader is exhausted. If the constructor is called but there is no more content to create a Reader for then the error `io.EOF` should be returned and the Lines will close.

Callers must also provide an onClose function, which will be called if the Lines has been instructed to shut down. This function should unblock any blocked Read calls.

func (*Lines) Acknowledge

func (r *Lines) Acknowledge(err error) error

Acknowledge confirms whether or not our unacknowledged messages have been successfully propagated or not.

func (*Lines) CloseAsync

func (r *Lines) CloseAsync()

CloseAsync shuts down the reader input and stops processing requests.

func (*Lines) Connect

func (r *Lines) Connect() error

Connect attempts to establish a new scanner for an io.Reader.

func (*Lines) Read

func (r *Lines) Read() (types.Message, error)

Read attempts to read a new line from the io.Reader.

func (*Lines) WaitForClose

func (r *Lines) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the reader input has closed down.

type NATS

type NATS struct {
	// contains filtered or unexported fields
}

NATS is an input type that receives NATS messages.

func (*NATS) Acknowledge

func (n *NATS) Acknowledge(err error) error

Acknowledge instructs whether read messages have been successfully propagated.

func (*NATS) CloseAsync

func (n *NATS) CloseAsync()

CloseAsync shuts down the NATS input and stops processing requests.

func (*NATS) Connect

func (n *NATS) Connect() error

Connect establishes a connection to a NATS server.

func (*NATS) Read

func (n *NATS) Read() (types.Message, error)

Read attempts to read a new message from the NATS subject.

func (*NATS) WaitForClose

func (n *NATS) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the NATS input has closed down.

type NATSConfig

type NATSConfig struct {
	URLs    []string `json:"urls" yaml:"urls"`
	Subject string   `json:"subject" yaml:"subject"`
}

NATSConfig is configuration for the NATS input type.

func NewNATSConfig

func NewNATSConfig() NATSConfig

NewNATSConfig creates a new NATSConfig with default values.

type NATSStream

type NATSStream struct {
	// contains filtered or unexported fields
}

NATSStream is an input type that receives NATSStream messages.

func (*NATSStream) Acknowledge

func (n *NATSStream) Acknowledge(err error) error

Acknowledge instructs whether unacknowledged messages have been successfully propagated.

func (*NATSStream) CloseAsync

func (n *NATSStream) CloseAsync()

CloseAsync shuts down the NATSStream input and stops processing requests.

func (*NATSStream) Connect

func (n *NATSStream) Connect() error

Connect attempts to establish a connection to a NATS streaming server.

func (*NATSStream) Read

func (n *NATSStream) Read() (types.Message, error)

Read attempts to read a new message from the NATS streaming server.

func (*NATSStream) WaitForClose

func (n *NATSStream) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the NATSStream input has closed down.

type NATSStreamConfig

type NATSStreamConfig struct {
	URLs        []string `json:"urls" yaml:"urls"`
	ClusterID   string   `json:"cluster_id" yaml:"cluster_id"`
	ClientID    string   `json:"client_id" yaml:"client_id"`
	QueueID     string   `json:"queue" yaml:"queue"`
	DurableName string   `json:"durable_name" yaml:"durable_name"`
	Subject     string   `json:"subject" yaml:"subject"`
}

NATSStreamConfig is configuration for the NATSStream input type.

func NewNATSStreamConfig

func NewNATSStreamConfig() NATSStreamConfig

NewNATSStreamConfig creates a new NATSStreamConfig with default values.

type NSQ

type NSQ struct {
	// contains filtered or unexported fields
}

NSQ is an input type that receives NSQ messages.

func (*NSQ) Acknowledge

func (n *NSQ) Acknowledge(err error) error

Acknowledge instructs whether unacknowledged messages have been successfully propagated.

func (*NSQ) CloseAsync

func (n *NSQ) CloseAsync()

CloseAsync shuts down the NSQ input and stops processing requests.

func (*NSQ) Connect

func (n *NSQ) Connect() (err error)

Connect establishes a connection to an NSQ server.

func (*NSQ) HandleMessage

func (n *NSQ) HandleMessage(message *nsq.Message) error

HandleMessage handles an NSQ message.

func (*NSQ) Read

func (n *NSQ) Read() (types.Message, error)

Read attempts to read a new message from NSQ.

func (*NSQ) WaitForClose

func (n *NSQ) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the NSQ input has closed down.

type NSQConfig

type NSQConfig struct {
	Addresses       []string `json:"nsqd_tcp_addresses" yaml:"nsqd_tcp_addresses"`
	LookupAddresses []string `json:"lookupd_http_addresses" yaml:"lookupd_http_addresses"`
	Topic           string   `json:"topic" yaml:"topic"`
	Channel         string   `json:"channel" yaml:"channel"`
	UserAgent       string   `json:"user_agent" yaml:"user_agent"`
	MaxInFlight     int      `json:"max_in_flight" yaml:"max_in_flight"`
}

NSQConfig is configuration for the NSQ input type.

func NewNSQConfig

func NewNSQConfig() NSQConfig

NewNSQConfig creates a new NSQConfig with default values.

type Preserver

type Preserver struct {
	// contains filtered or unexported fields
}

Preserver is a wrapper for reader.Type implementations that keeps a buffer of sent messages until they are acknowledged. If an error occurs during message propagation the contents of the buffer will be resent instead of reading new messages until it is depleted. Preserver implements reader.Type.

func NewPreserver

func NewPreserver(r Type) *Preserver

NewPreserver returns a new Preserver wrapper around a reader.Type.

func (*Preserver) Acknowledge

func (p *Preserver) Acknowledge(err error) error

Acknowledge instructs whether messages read since the last Acknowledge call were successfully propagated. If the error is nil this will be forwarded to the underlying wrapped reader. If a non-nil error is returned the buffer of messages will be resent.

func (*Preserver) CloseAsync

func (p *Preserver) CloseAsync()

CloseAsync triggers the asynchronous closing of the reader.

func (*Preserver) Connect

func (p *Preserver) Connect() error

Connect attempts to establish a connection to the source, if unsuccessful returns an error. If the attempt is successful (or not necessary) returns nil.

func (*Preserver) Read

func (p *Preserver) Read() (types.Message, error)

Read attempts to read a new message from the source.

func (*Preserver) WaitForClose

func (p *Preserver) WaitForClose(tout time.Duration) error

WaitForClose blocks until either the reader is finished closing or a timeout occurs.

type RedisList

type RedisList struct {
	// contains filtered or unexported fields
}

RedisList is an input type that reads Redis Pub/Sub messages.

func NewRedisList

func NewRedisList(
	conf RedisListConfig, log log.Modular, stats metrics.Type,
) (*RedisList, error)

NewRedisList creates a new RedisList input type.

func (*RedisList) Acknowledge

func (r *RedisList) Acknowledge(err error) error

Acknowledge instructs whether messages have been successfully propagated.

func (*RedisList) CloseAsync

func (r *RedisList) CloseAsync()

CloseAsync shuts down the RedisList input and stops processing requests.

func (*RedisList) Connect

func (r *RedisList) Connect() error

Connect establishes a connection to an RedisList server.

func (*RedisList) Read

func (r *RedisList) Read() (types.Message, error)

Read attempts to pop a message from a redis list.

func (*RedisList) WaitForClose

func (r *RedisList) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the RedisList input has closed down.

type RedisListConfig

type RedisListConfig struct {
	URL       string `json:"url" yaml:"url"`
	Key       string `json:"key" yaml:"key"`
	TimeoutMS int    `json:"timeout_ms" yaml:"timeout_ms"`
}

RedisListConfig is configuration for the RedisList input type.

func NewRedisListConfig

func NewRedisListConfig() RedisListConfig

NewRedisListConfig creates a new RedisListConfig with default values.

type RedisPubSub

type RedisPubSub struct {
	// contains filtered or unexported fields
}

RedisPubSub is an input type that reads Redis Pub/Sub messages.

func NewRedisPubSub

func NewRedisPubSub(
	conf RedisPubSubConfig, log log.Modular, stats metrics.Type,
) (*RedisPubSub, error)

NewRedisPubSub creates a new RedisPubSub input type.

func (*RedisPubSub) Acknowledge

func (r *RedisPubSub) Acknowledge(err error) error

Acknowledge instructs whether messages have been successfully propagated.

func (*RedisPubSub) CloseAsync

func (r *RedisPubSub) CloseAsync()

CloseAsync shuts down the RedisPubSub input and stops processing requests.

func (*RedisPubSub) Connect

func (r *RedisPubSub) Connect() error

Connect establishes a connection to an RedisPubSub server.

func (*RedisPubSub) Read

func (r *RedisPubSub) Read() (types.Message, error)

Read attempts to pop a message from a redis list.

func (*RedisPubSub) WaitForClose

func (r *RedisPubSub) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the RedisPubSub input has closed down.

type RedisPubSubConfig

type RedisPubSubConfig struct {
	URL      string   `json:"url" yaml:"url"`
	Channels []string `json:"channels" yaml:"channels"`
}

RedisPubSubConfig is configuration for the RedisPubSub input type.

func NewRedisPubSubConfig

func NewRedisPubSubConfig() RedisPubSubConfig

NewRedisPubSubConfig creates a new RedisPubSubConfig with default values.

type ScaleProto

type ScaleProto struct {
	// contains filtered or unexported fields
}

ScaleProto is an input type that serves Scalability Protocols messages.

func (*ScaleProto) Acknowledge

func (s *ScaleProto) Acknowledge(err error) error

Acknowledge instructs whether the pending messages were propagated successfully.

func (*ScaleProto) CloseAsync

func (s *ScaleProto) CloseAsync()

CloseAsync shuts down the ScaleProto input and stops processing requests.

func (*ScaleProto) Connect

func (s *ScaleProto) Connect() error

Connect establishes a nanomsg socket.

func (*ScaleProto) Read

func (s *ScaleProto) Read() (types.Message, error)

Read attempts to read a new message from the nanomsg socket.

func (*ScaleProto) WaitForClose

func (s *ScaleProto) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the ScaleProto input has closed down.

type ScaleProtoConfig

type ScaleProtoConfig struct {
	URLs          []string `json:"urls" yaml:"urls"`
	Bind          bool     `json:"bind" yaml:"bind"`
	SocketType    string   `json:"socket_type" yaml:"socket_type"`
	SubFilters    []string `json:"sub_filters" yaml:"sub_filters"`
	PollTimeoutMS int      `json:"poll_timeout_ms" yaml:"poll_timeout_ms"`
	RepTimeoutMS  int      `json:"reply_timeout_ms" yaml:"reply_timeout_ms"`
}

ScaleProtoConfig is configuration for the ScaleProto input type.

func NewScaleProtoConfig

func NewScaleProtoConfig() ScaleProtoConfig

NewScaleProtoConfig creates a new ScaleProtoConfig with default values.

type Type

type Type interface {
	// Connect attempts to establish a connection to the source, if unsuccessful
	// returns an error. If the attempt is successful (or not necessary) returns
	// nil.
	Connect() error

	// Acknowledge, if applicable to the source, should send acknowledgments for
	// (or commit) all unacknowledged (or uncommitted) messages that have thus
	// far been consumed. If the error is non-nil this means the message was
	// unsuccessfully propagated down the pipeline, in which case it is up to
	// the implementation to decide whether to simply retry uncommitted messages
	// that are buffered locally, or to send the error upstream.
	Acknowledge(err error) error

	// Read attempts to read a new message from the source.
	Read() (types.Message, error)

	types.Closable
}

Type is a type that writes Benthos messages to a third party sink.

func NewAMQP

func NewAMQP(conf AMQPConfig, log log.Modular, stats metrics.Type) (Type, error)

NewAMQP creates a new AMQP input type.

func NewNATS

func NewNATS(conf NATSConfig, log log.Modular, stats metrics.Type) (Type, error)

NewNATS creates a new NATS input type.

func NewNATSStream

func NewNATSStream(conf NATSStreamConfig, log log.Modular, stats metrics.Type) (Type, error)

NewNATSStream creates a new NATSStream input type.

func NewNSQ

func NewNSQ(conf NSQConfig, log log.Modular, stats metrics.Type) (Type, error)

NewNSQ create a new NSQ input type.

func NewScaleProto

func NewScaleProto(conf ScaleProtoConfig, log log.Modular, stats metrics.Type) (Type, error)

NewScaleProto creates a new ScaleProto input type.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL