Documentation ¶
Overview ¶
Package input - Defines consumers for aggregating data from a variety of sources. All consumer types must implement interface input.Type.
If the source of a consumer is given assurances of message receipt then the consumer must ensure that all messages read from a source are propagated to a reader before shutting down gracefully. For example, a ZMQ subscriber model based consumer would not need to provide such ensurance, a HTTP POST based consumer, however, would be expected to propagate any requests where a 200 OK message has been returned.
Index ¶
- Variables
- func Descriptions() string
- type Config
- type FanInConfig
- type File
- type FileConfig
- type HTTPServer
- type HTTPServerConfig
- type Kafka
- type KafkaConfig
- type MockType
- type STDIN
- type ScaleProto
- type ScaleProtoConfig
- type Type
- func Construct(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)
- func NewFanIn(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)
- func NewFile(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)
- func NewHTTPServer(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)
- func NewKafka(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)
- func NewSTDIN(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)
- func NewScaleProto(conf Config, log log.Modular, stats metrics.Aggregator) (Type, error)
- type ZMQ4Config
Constants ¶
This section is empty.
Variables ¶
var ( // ErrFanInNoInputs - Returned when creating a FanIn type with zero inputs. ErrFanInNoInputs = errors.New("attempting to create fan_in input type with no inputs") )
Functions ¶
func Descriptions ¶
func Descriptions() string
Descriptions - Returns a formatted string of collated descriptions of each type.
Types ¶
type Config ¶
type Config struct { Type string `json:"type" yaml:"type"` HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"` ScaleProto ScaleProtoConfig `json:"scalability_protocols" yaml:"scalability_protocols"` ZMQ4 *ZMQ4Config `json:"zmq4,omitempty" yaml:"zmq4,omitempty"` Kafka KafkaConfig `json:"kafka" yaml:"kafka"` File FileConfig `json:"file" yaml:"file"` STDIN struct{} `json:"stdin" yaml:"stdin"` FanIn FanInConfig `json:"fan_in" yaml:"fan_in"` }
Config - The all encompassing configuration struct for all input types. Note that some configs are empty structs, as the type has no optional values but we want to list it as an option.
type FanInConfig ¶
type FanInConfig struct {
Inputs []interface{} `json:"inputs" yaml:"inputs"`
}
FanInConfig - Configuration for the FanIn input type.
func NewFanInConfig ¶
func NewFanInConfig() FanInConfig
NewFanInConfig - Creates a new FanInConfig with default values.
type File ¶
type File struct {
// contains filtered or unexported fields
}
File - An input type that reads lines from a file, creating a message per line.
func (*File) CloseAsync ¶
func (f *File) CloseAsync()
CloseAsync - Shuts down the File input and stops processing requests.
func (*File) MessageChan ¶
MessageChan - Returns the messages channel.
func (*File) StartListening ¶
StartListening - Sets the channel used by the input to validate message receipt.
type FileConfig ¶
type FileConfig struct {
Path string `json:"path" yaml:"path"`
}
FileConfig - Configuration values for the File input type.
func NewFileConfig ¶
func NewFileConfig() FileConfig
NewFileConfig - Creates a new FileConfig with default values.
type HTTPServer ¶
type HTTPServer struct {
// contains filtered or unexported fields
}
HTTPServer - An input type that serves HTTPServer POST requests.
func (*HTTPServer) CloseAsync ¶
func (h *HTTPServer) CloseAsync()
CloseAsync - Shuts down the HTTPServer input and stops processing requests.
func (*HTTPServer) MessageChan ¶
func (h *HTTPServer) MessageChan() <-chan types.Message
MessageChan - Returns the messages channel.
func (*HTTPServer) StartListening ¶
func (h *HTTPServer) StartListening(responses <-chan types.Response) error
StartListening - Sets the channel used by the input to validate message receipt.
func (*HTTPServer) WaitForClose ¶
func (h *HTTPServer) WaitForClose(timeout time.Duration) error
WaitForClose - Blocks until the HTTPServer input has closed down.
type HTTPServerConfig ¶
type HTTPServerConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` TimeoutMS int64 `json:"timeout_ms" yaml:"timeout_ms"` }
HTTPServerConfig - Configuration for the HTTPServer input type.
func NewHTTPServerConfig ¶
func NewHTTPServerConfig() HTTPServerConfig
NewHTTPServerConfig - Creates a new HTTPServerConfig with default values.
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka - An input type that reads from a Kafka instance.
func (*Kafka) CloseAsync ¶
func (k *Kafka) CloseAsync()
CloseAsync - Shuts down the Kafka input and stops processing requests.
func (*Kafka) MessageChan ¶
MessageChan - Returns the messages channel.
func (*Kafka) StartListening ¶
StartListening - Sets the channel used by the input to validate message receipt.
type KafkaConfig ¶
type KafkaConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` ClientID string `json:"client_id" yaml:"client_id"` ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` Topic string `json:"topic" yaml:"topic"` Partition int32 `json:"partition" yam:"partition"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` }
KafkaConfig - Configuration for the Kafka input type.
func NewKafkaConfig ¶
func NewKafkaConfig() KafkaConfig
NewKafkaConfig - Creates a new KafkaConfig with default values.
type MockType ¶
MockType - Implements the input.Type interface.
func (*MockType) MessageChan ¶
MessageChan - Returns the messages channel.
func (*MockType) StartListening ¶
StartListening - Sets the channel used for reading responses.
type STDIN ¶
type STDIN struct {
// contains filtered or unexported fields
}
STDIN - An input type that reads lines from STDIN.
func (*STDIN) CloseAsync ¶
func (s *STDIN) CloseAsync()
CloseAsync - Shuts down the STDIN input and stops processing requests.
func (*STDIN) MessageChan ¶
MessageChan - Returns the messages channel.
func (*STDIN) StartListening ¶
StartListening - Sets the channel used by the input to validate message receipt.
type ScaleProto ¶
type ScaleProto struct {
// contains filtered or unexported fields
}
ScaleProto - An input type that serves Scalability Protocols messages.
func (*ScaleProto) CloseAsync ¶
func (s *ScaleProto) CloseAsync()
CloseAsync - Shuts down the ScaleProto input and stops processing requests.
func (*ScaleProto) MessageChan ¶
func (s *ScaleProto) MessageChan() <-chan types.Message
MessageChan - Returns the messages channel.
func (*ScaleProto) StartListening ¶
func (s *ScaleProto) StartListening(responses <-chan types.Response) error
StartListening - Sets the channel used by the input to validate message receipt.
func (*ScaleProto) WaitForClose ¶
func (s *ScaleProto) WaitForClose(timeout time.Duration) error
WaitForClose - Blocks until the ScaleProto input has closed down.
type ScaleProtoConfig ¶
type ScaleProtoConfig struct { Address string `json:"address" yaml:"address"` Bind bool `json:"bind_address" yaml:"bind_address"` SocketType string `json:"socket_type" yaml:"socket_type"` SubFilters []string `json:"sub_filters" yaml:"sub_filters"` PollTimeoutMS int `json:"poll_timeout_ms" yaml:"poll_timeout_ms"` }
ScaleProtoConfig - Configuration for the ScaleProto input type.
func NewScaleProtoConfig ¶
func NewScaleProtoConfig() ScaleProtoConfig
NewScaleProtoConfig - Creates a new ScaleProtoConfig with default values.
type Type ¶
Type - The standard interface of an input type.
func NewHTTPServer ¶
NewHTTPServer - Create a new HTTPServer input type.
func NewScaleProto ¶
NewScaleProto - Create a new ScaleProto input type.