input

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2016 License: MIT Imports: 27 Imported by: 0

Documentation

Overview

Package input - Defines consumers for aggregating data from a variety of sources. All consumer types must implement interface input.Type.

If the source of a consumer is given assurances of message receipt then the consumer must ensure that all messages read from a source are propagated to a reader before shutting down gracefully. For example, a ZMQ subscriber model based consumer would not need to provide such ensurance, a HTTP POST based consumer, however, would be expected to propagate any requests where a 200 OK message has been returned.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrFanInNoInputs - Returned when creating a FanIn type with zero inputs.
	ErrFanInNoInputs = errors.New("attempting to create fan_in input type with no inputs")
)
View Source
var (
	ErrHWMInvalid = errors.New("high water mark is invalid (must be integer greater than 1)")
)

Errors for the HTTPServer type.

Functions

func Descriptions

func Descriptions() string

Descriptions - Returns a formatted string of collated descriptions of each type.

Types

type AMQP added in v0.0.2

type AMQP struct {
	// contains filtered or unexported fields
}

AMQP - An input type that serves Scalability Protocols messages.

func (*AMQP) CloseAsync added in v0.0.2

func (a *AMQP) CloseAsync()

CloseAsync - Shuts down the AMQP input and stops processing requests.

func (*AMQP) MessageChan added in v0.0.2

func (a *AMQP) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*AMQP) StartListening added in v0.0.2

func (a *AMQP) StartListening(responses <-chan types.Response) error

StartListening - Sets the channel used by the input to validate message receipt.

func (*AMQP) WaitForClose added in v0.0.2

func (a *AMQP) WaitForClose(timeout time.Duration) error

WaitForClose - Blocks until the AMQP input has closed down.

type AMQPConfig added in v0.0.2

type AMQPConfig struct {
	URI          string `json:"uri" yaml:"uri"`
	Exchange     string `json:"exchange" yaml:"exchange"`
	ExchangeType string `json:"exchange_type" yaml:"exchange_type"`
	Queue        string `json:"queue" yaml:"queue"`
	BindingKey   string `json:"key" yaml:"key"`
	ConsumerTag  string `json:"consumer_tag" yaml:"consumer_tag"`
}

AMQPConfig - Configuration for the AMQP input type.

func NewAMQPConfig added in v0.0.2

func NewAMQPConfig() AMQPConfig

NewAMQPConfig - Creates a new AMQPConfig with default values.

type Config

type Config struct {
	Type       string           `json:"type" yaml:"type"`
	HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"`
	ScaleProto ScaleProtoConfig `json:"scalability_protocols" yaml:"scalability_protocols"`
	ZMQ4       *ZMQ4Config      `json:"zmq4,omitempty" yaml:"zmq4,omitempty"`
	Kafka      KafkaConfig      `json:"kafka" yaml:"kafka"`
	AMQP       AMQPConfig       `json:"amqp" yaml:"amqp"`
	File       FileConfig       `json:"file" yaml:"file"`
	STDIN      STDINConfig      `json:"stdin" yaml:"stdin"`
	FanIn      FanInConfig      `json:"fan_in" yaml:"fan_in"`
}

Config - The all encompassing configuration struct for all input types. Note that some configs are empty structs, as the type has no optional values but we want to list it as an option.

func NewConfig

func NewConfig() Config

NewConfig - Returns a configuration struct fully populated with default values.

type FanInConfig

type FanInConfig struct {
	Inputs []interface{} `json:"inputs" yaml:"inputs"`
}

FanInConfig - Configuration for the FanIn input type.

func NewFanInConfig

func NewFanInConfig() FanInConfig

NewFanInConfig - Creates a new FanInConfig with default values.

type File

type File struct {
	// contains filtered or unexported fields
}

File - An input type that reads lines from a file, creating a message per line.

func (*File) CloseAsync

func (f *File) CloseAsync()

CloseAsync - Shuts down the File input and stops processing requests.

func (*File) MessageChan

func (f *File) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*File) StartListening

func (f *File) StartListening(responses <-chan types.Response) error

StartListening - Sets the channel used by the input to validate message receipt.

func (*File) WaitForClose

func (f *File) WaitForClose(timeout time.Duration) error

WaitForClose - Blocks until the File input has closed down.

type FileConfig

type FileConfig struct {
	Path      string `json:"path" yaml:"path"`
	Multipart bool   `json:"multipart" yaml:"multipart"`
}

FileConfig - Configuration values for the File input type.

func NewFileConfig

func NewFileConfig() FileConfig

NewFileConfig - Creates a new FileConfig with default values.

type HTTPServer

type HTTPServer struct {
	// contains filtered or unexported fields
}

HTTPServer - An input type that serves HTTPServer POST requests.

func (*HTTPServer) CloseAsync

func (h *HTTPServer) CloseAsync()

CloseAsync - Shuts down the HTTPServer input and stops processing requests.

func (*HTTPServer) MessageChan

func (h *HTTPServer) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*HTTPServer) StartListening

func (h *HTTPServer) StartListening(responses <-chan types.Response) error

StartListening - Sets the channel used by the input to validate message receipt.

func (*HTTPServer) WaitForClose

func (h *HTTPServer) WaitForClose(timeout time.Duration) error

WaitForClose - Blocks until the HTTPServer input has closed down.

type HTTPServerConfig

type HTTPServerConfig struct {
	Address       string `json:"address" yaml:"address"`
	Path          string `json:"path" yaml:"path"`
	TimeoutMS     int64  `json:"timeout_ms" yaml:"timeout_ms"`
	HighWaterMark int    `json:"high_water_mark" yaml:"high_water_mark"`
}

HTTPServerConfig - Configuration for the HTTPServer input type.

func NewHTTPServerConfig

func NewHTTPServerConfig() HTTPServerConfig

NewHTTPServerConfig - Creates a new HTTPServerConfig with default values.

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka - An input type that reads from a Kafka instance.

func (*Kafka) CloseAsync

func (k *Kafka) CloseAsync()

CloseAsync - Shuts down the Kafka input and stops processing requests.

func (*Kafka) MessageChan

func (k *Kafka) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*Kafka) StartListening

func (k *Kafka) StartListening(responses <-chan types.Response) error

StartListening - Sets the channel used by the input to validate message receipt.

func (*Kafka) WaitForClose

func (k *Kafka) WaitForClose(timeout time.Duration) error

WaitForClose - Blocks until the Kafka input has closed down.

type KafkaConfig

type KafkaConfig struct {
	Addresses       []string `json:"addresses" yaml:"addresses"`
	ClientID        string   `json:"client_id" yaml:"client_id"`
	ConsumerGroup   string   `json:"consumer_group" yaml:"consumer_group"`
	Topic           string   `json:"topic" yaml:"topic"`
	Partition       int32    `json:"partition" yam:"partition"`
	StartFromOldest bool     `json:"start_from_oldest" yaml:"start_from_oldest"`
}

KafkaConfig - Configuration for the Kafka input type.

func NewKafkaConfig

func NewKafkaConfig() KafkaConfig

NewKafkaConfig - Creates a new KafkaConfig with default values.

type MockType

type MockType struct {
	MsgChan chan types.Message
	ResChan <-chan types.Response
}

MockType - Implements the input.Type interface.

func (MockType) CloseAsync

func (m MockType) CloseAsync()

CloseAsync - Does nothing.

func (*MockType) MessageChan

func (m *MockType) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*MockType) StartListening

func (m *MockType) StartListening(resChan <-chan types.Response) error

StartListening - Sets the channel used for reading responses.

func (MockType) WaitForClose

func (m MockType) WaitForClose(time.Duration) error

WaitForClose - Does nothing.

type STDIN

type STDIN struct {
	// contains filtered or unexported fields
}

STDIN - An input type that reads lines from STDIN.

func (*STDIN) CloseAsync

func (s *STDIN) CloseAsync()

CloseAsync - Shuts down the STDIN input and stops processing requests.

func (*STDIN) MessageChan

func (s *STDIN) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*STDIN) StartListening

func (s *STDIN) StartListening(responses <-chan types.Response) error

StartListening - Sets the channel used by the input to validate message receipt.

func (*STDIN) WaitForClose

func (s *STDIN) WaitForClose(timeout time.Duration) error

WaitForClose - Blocks until the STDIN input has closed down.

type STDINConfig added in v0.0.2

type STDINConfig struct {
	Multipart bool `json:"multipart" yaml:"multipart"`
}

STDINConfig - contains config fields for the STDIN input type.

func NewSTDINConfig added in v0.0.2

func NewSTDINConfig() STDINConfig

NewSTDINConfig - creates a STDINConfig populated with default values.

type ScaleProto

type ScaleProto struct {
	// contains filtered or unexported fields
}

ScaleProto - An input type that serves Scalability Protocols messages.

func (*ScaleProto) CloseAsync

func (s *ScaleProto) CloseAsync()

CloseAsync - Shuts down the ScaleProto input and stops processing requests.

func (*ScaleProto) MessageChan

func (s *ScaleProto) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*ScaleProto) StartListening

func (s *ScaleProto) StartListening(responses <-chan types.Response) error

StartListening - Sets the channel used by the input to validate message receipt.

func (*ScaleProto) WaitForClose

func (s *ScaleProto) WaitForClose(timeout time.Duration) error

WaitForClose - Blocks until the ScaleProto input has closed down.

type ScaleProtoConfig

type ScaleProtoConfig struct {
	Address       string   `json:"address" yaml:"address"`
	Bind          bool     `json:"bind_address" yaml:"bind_address"`
	SocketType    string   `json:"socket_type" yaml:"socket_type"`
	SubFilters    []string `json:"sub_filters" yaml:"sub_filters"`
	PollTimeoutMS int      `json:"poll_timeout_ms" yaml:"poll_timeout_ms"`
}

ScaleProtoConfig - Configuration for the ScaleProto input type.

func NewScaleProtoConfig

func NewScaleProtoConfig() ScaleProtoConfig

NewScaleProtoConfig - Creates a new ScaleProtoConfig with default values.

type Type

type Type interface {
	types.Closable
	types.Producer
}

Type - The standard interface of an input type.

func New added in v0.0.2

func New(conf Config, log log.Modular, stats metrics.Type) (Type, error)

New - Create an input type based on an input configuration.

func NewAMQP added in v0.0.2

func NewAMQP(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewAMQP - Create a new AMQP input type.

func NewFanIn

func NewFanIn(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewFanIn - Create a new FanIn input type.

func NewFile

func NewFile(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewFile - Create a new File input type.

func NewHTTPServer

func NewHTTPServer(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewHTTPServer - Create a new HTTPServer input type.

func NewKafka

func NewKafka(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewKafka - Create a new Kafka input type.

func NewSTDIN

func NewSTDIN(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewSTDIN - Create a new STDIN input type.

func NewScaleProto

func NewScaleProto(conf Config, log log.Modular, stats metrics.Type) (Type, error)

NewScaleProto - Create a new ScaleProto input type.

type ZMQ4Config

type ZMQ4Config struct{}

ZMQ4Config - Empty stub for when ZMQ4 is not compiled.

func NewZMQ4Config

func NewZMQ4Config() *ZMQ4Config

NewZMQ4Config - Returns nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL