input

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2016 License: MIT Imports: 25 Imported by: 0

Documentation

Overview

Package input - Defines consumers for aggregating data from a variety of sources. All consumer types must implement interface input.Type.

If the source of a consumer is given assurances of message receipt then the consumer must ensure that all messages read from a source are propagated to a reader before shutting down gracefully. For example, a ZMQ subscriber model based consumer would not need to provide such ensurance, a HTTP POST based consumer, however, would be expected to propagate any requests where a 200 OK message has been returned.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrFanInNoInputs - Returned when creating a FanIn type with zero inputs.
	ErrFanInNoInputs = errors.New("attempting to create fan_in input type with no inputs")
)

Functions

func Descriptions

func Descriptions() string

Descriptions - Returns a formatted string of collated descriptions of each type.

Types

type Config

type Config struct {
	Type       string           `json:"type" yaml:"type"`
	HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"`
	ScaleProto ScaleProtoConfig `json:"scalability_protocols" yaml:"scalability_protocols"`
	ZMQ4       *ZMQ4Config      `json:"zmq4,omitempty" yaml:"zmq4,omitempty"`
	Kafka      KafkaConfig      `json:"kafka" yaml:"kafka"`
	File       FileConfig       `json:"file" yaml:"file"`
	STDIN      struct{}         `json:"stdin" yaml:"stdin"`
	FanIn      FanInConfig      `json:"fan_in" yaml:"fan_in"`
}

Config - The all encompassing configuration struct for all input types. Note that some configs are empty structs, as the type has no optional values but we want to list it as an option.

func NewConfig

func NewConfig() Config

NewConfig - Returns a configuration struct fully populated with default values.

type FanInConfig

type FanInConfig struct {
	Inputs []interface{} `json:"inputs" yaml:"inputs"`
}

FanInConfig - Configuration for the FanIn input type.

func NewFanInConfig

func NewFanInConfig() FanInConfig

NewFanInConfig - Creates a new FanInConfig with default values.

type File

type File struct {
	// contains filtered or unexported fields
}

File - An input type that reads lines from a file, creating a message per line.

func (*File) CloseAsync

func (f *File) CloseAsync()

CloseAsync - Shuts down the File input and stops processing requests.

func (*File) MessageChan

func (f *File) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*File) StartListening

func (f *File) StartListening(responses <-chan types.Response) error

StartListening - Sets the channel used by the input to validate message receipt.

func (*File) WaitForClose

func (f *File) WaitForClose(timeout time.Duration) error

WaitForClose - Blocks until the File input has closed down.

type FileConfig

type FileConfig struct {
	Path string `json:"path" yaml:"path"`
}

FileConfig - Configuration values for the File input type.

func NewFileConfig

func NewFileConfig() FileConfig

NewFileConfig - Creates a new FileConfig with default values.

type HTTPServer

type HTTPServer struct {
	// contains filtered or unexported fields
}

HTTPServer - An input type that serves HTTPServer POST requests.

func (*HTTPServer) CloseAsync

func (h *HTTPServer) CloseAsync()

CloseAsync - Shuts down the HTTPServer input and stops processing requests.

func (*HTTPServer) MessageChan

func (h *HTTPServer) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*HTTPServer) StartListening

func (h *HTTPServer) StartListening(responses <-chan types.Response) error

StartListening - Sets the channel used by the input to validate message receipt.

func (*HTTPServer) WaitForClose

func (h *HTTPServer) WaitForClose(timeout time.Duration) error

WaitForClose - Blocks until the HTTPServer input has closed down.

type HTTPServerConfig

type HTTPServerConfig struct {
	Address   string `json:"address" yaml:"address"`
	Path      string `json:"path" yaml:"path"`
	TimeoutMS int64  `json:"timeout_ms" yaml:"timeout_ms"`
}

HTTPServerConfig - Configuration for the HTTPServer input type.

func NewHTTPServerConfig

func NewHTTPServerConfig() HTTPServerConfig

NewHTTPServerConfig - Creates a new HTTPServerConfig with default values.

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka - An input type that reads from a Kafka instance.

func (*Kafka) CloseAsync

func (k *Kafka) CloseAsync()

CloseAsync - Shuts down the Kafka input and stops processing requests.

func (*Kafka) MessageChan

func (k *Kafka) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*Kafka) StartListening

func (k *Kafka) StartListening(responses <-chan types.Response) error

StartListening - Sets the channel used by the input to validate message receipt.

func (*Kafka) WaitForClose

func (k *Kafka) WaitForClose(timeout time.Duration) error

WaitForClose - Blocks until the Kafka input has closed down.

type KafkaConfig

type KafkaConfig struct {
	Addresses       []string `json:"addresses" yaml:"addresses"`
	ClientID        string   `json:"client_id" yaml:"client_id"`
	ConsumerGroup   string   `json:"consumer_group" yaml:"consumer_group"`
	Topic           string   `json:"topic" yaml:"topic"`
	Partition       int32    `json:"partition" yam:"partition"`
	StartFromOldest bool     `json:"start_from_oldest" yaml:"start_from_oldest"`
}

KafkaConfig - Configuration for the Kafka input type.

func NewKafkaConfig

func NewKafkaConfig() KafkaConfig

NewKafkaConfig - Creates a new KafkaConfig with default values.

type MockType

type MockType struct {
	MsgChan chan types.Message
	ResChan <-chan types.Response
}

MockType - Implements the input.Type interface.

func (MockType) CloseAsync

func (m MockType) CloseAsync()

CloseAsync - Does nothing.

func (*MockType) MessageChan

func (m *MockType) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*MockType) StartListening

func (m *MockType) StartListening(resChan <-chan types.Response) error

StartListening - Sets the channel used for reading responses.

func (MockType) WaitForClose

func (m MockType) WaitForClose(time.Duration) error

WaitForClose - Does nothing.

type STDIN

type STDIN struct {
	// contains filtered or unexported fields
}

STDIN - An input type that reads lines from STDIN.

func (*STDIN) CloseAsync

func (s *STDIN) CloseAsync()

CloseAsync - Shuts down the STDIN input and stops processing requests.

func (*STDIN) MessageChan

func (s *STDIN) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*STDIN) StartListening

func (s *STDIN) StartListening(responses <-chan types.Response) error

StartListening - Sets the channel used by the input to validate message receipt.

func (*STDIN) WaitForClose

func (s *STDIN) WaitForClose(timeout time.Duration) error

WaitForClose - Blocks until the STDIN input has closed down.

type ScaleProto

type ScaleProto struct {
	// contains filtered or unexported fields
}

ScaleProto - An input type that serves Scalability Protocols messages.

func (*ScaleProto) CloseAsync

func (s *ScaleProto) CloseAsync()

CloseAsync - Shuts down the ScaleProto input and stops processing requests.

func (*ScaleProto) MessageChan

func (s *ScaleProto) MessageChan() <-chan types.Message

MessageChan - Returns the messages channel.

func (*ScaleProto) StartListening

func (s *ScaleProto) StartListening(responses <-chan types.Response) error

StartListening - Sets the channel used by the input to validate message receipt.

func (*ScaleProto) WaitForClose

func (s *ScaleProto) WaitForClose(timeout time.Duration) error

WaitForClose - Blocks until the ScaleProto input has closed down.

type ScaleProtoConfig

type ScaleProtoConfig struct {
	Address       string   `json:"address" yaml:"address"`
	Bind          bool     `json:"bind_address" yaml:"bind_address"`
	SocketType    string   `json:"socket_type" yaml:"socket_type"`
	SubFilters    []string `json:"sub_filters" yaml:"sub_filters"`
	PollTimeoutMS int      `json:"poll_timeout_ms" yaml:"poll_timeout_ms"`
}

ScaleProtoConfig - Configuration for the ScaleProto input type.

func NewScaleProtoConfig

func NewScaleProtoConfig() ScaleProtoConfig

NewScaleProtoConfig - Creates a new ScaleProtoConfig with default values.

type Type

type Type interface {
	types.Closable
	types.Producer
}

Type - The standard interface of an input type.

func Construct

func Construct(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)

Construct - Create an input type based on an input configuration.

func NewFanIn

func NewFanIn(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)

NewFanIn - Create a new FanIn input type.

func NewFile

func NewFile(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)

NewFile - Create a new File input type.

func NewHTTPServer

func NewHTTPServer(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)

NewHTTPServer - Create a new HTTPServer input type.

func NewKafka

func NewKafka(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)

NewKafka - Create a new Kafka input type.

func NewSTDIN

func NewSTDIN(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)

NewSTDIN - Create a new STDIN input type.

func NewScaleProto

func NewScaleProto(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)

NewScaleProto - Create a new ScaleProto input type.

type ZMQ4Config

type ZMQ4Config struct{}

ZMQ4Config - Empty stub for when ZMQ4 is not compiled.

func NewZMQ4Config

func NewZMQ4Config() *ZMQ4Config

NewZMQ4Config - Returns nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL