Documentation ¶
Overview ¶
Package input - Defines consumers for aggregating data from a variety of sources. All consumer types must implement interface input.Type.
If the source of a consumer is given assurances of message receipt then the consumer must ensure that all messages read from a source are propagated to a reader before shutting down gracefully. For example, a ZMQ subscriber model based consumer would not need to provide such ensurance, a HTTP POST based consumer, however, would be expected to propagate any requests where a 200 OK message has been returned.
Index ¶
- Variables
- func Descriptions() string
- type AMQP
- type AMQPConfig
- type Config
- type FanInConfig
- type File
- type FileConfig
- type HTTPServer
- type HTTPServerConfig
- type Kafka
- type KafkaConfig
- type NATS
- type NATSConfig
- type NSQ
- type NSQConfig
- type PipelineConstructor
- type STDIN
- type STDINConfig
- type ScaleProto
- type ScaleProtoConfig
- type Type
- func New(conf Config, log log.Modular, stats metrics.Type, ...) (Type, error)
- func NewAMQP(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewFanIn(conf Config, log log.Modular, stats metrics.Type, ...) (Type, error)
- func NewFile(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewHTTPServer(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewKafka(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewNATS(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewNSQ(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewSTDIN(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func NewScaleProto(conf Config, log log.Modular, stats metrics.Type) (Type, error)
- func WrapWithPipelines(in Type, pipeConstructors ...PipelineConstructor) (Type, error)
- type WithPipeline
- type ZMQ4Config
Constants ¶
This section is empty.
Variables ¶
var ( // ErrFanInNoInputs - Returned when creating a FanIn type with zero inputs. ErrFanInNoInputs = errors.New("attempting to create fan_in input type with no inputs") )
var (
ErrHWMInvalid = errors.New("high water mark is invalid (must be integer greater than 1)")
)
Errors for the HTTPServer type.
Functions ¶
func Descriptions ¶
func Descriptions() string
Descriptions returns a formatted string of descriptions for each type.
Types ¶
type AMQP ¶ added in v0.0.2
type AMQP struct {
// contains filtered or unexported fields
}
AMQP - An input type that serves Scalability Protocols messages.
func (*AMQP) CloseAsync ¶ added in v0.0.2
func (a *AMQP) CloseAsync()
CloseAsync - Shuts down the AMQP input and stops processing requests.
func (*AMQP) MessageChan ¶ added in v0.0.2
MessageChan - Returns the messages channel.
func (*AMQP) StartListening ¶ added in v0.0.2
StartListening - Sets the channel used by the input to validate message receipt.
type AMQPConfig ¶ added in v0.0.2
type AMQPConfig struct { URI string `json:"uri" yaml:"uri"` Exchange string `json:"exchange" yaml:"exchange"` ExchangeType string `json:"exchange_type" yaml:"exchange_type"` Queue string `json:"queue" yaml:"queue"` BindingKey string `json:"key" yaml:"key"` ConsumerTag string `json:"consumer_tag" yaml:"consumer_tag"` PrefetchCount int `json:"prefetch_count" yaml:"prefetch_count"` PrefetchSize int `json:"prefetch_size" yaml:"prefetch_size"` }
AMQPConfig - Configuration for the AMQP input type.
func NewAMQPConfig ¶ added in v0.0.2
func NewAMQPConfig() AMQPConfig
NewAMQPConfig - Creates a new AMQPConfig with default values.
type Config ¶
type Config struct { Type string `json:"type" yaml:"type"` HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"` ScaleProto ScaleProtoConfig `json:"scalability_protocols" yaml:"scalability_protocols"` ZMQ4 *ZMQ4Config `json:"zmq4,omitempty" yaml:"zmq4,omitempty"` Kafka KafkaConfig `json:"kafka" yaml:"kafka"` AMQP AMQPConfig `json:"amqp" yaml:"amqp"` NSQ NSQConfig `json:"nsq" yaml:"nsq"` NATS NATSConfig `json:"nats" yaml:"nats"` File FileConfig `json:"file" yaml:"file"` STDIN STDINConfig `json:"stdin" yaml:"stdin"` FanIn FanInConfig `json:"fan_in" yaml:"fan_in"` }
Config is the all encompassing configuration struct for all input types. Note that some configs are empty structs, as the type has no optional values but we want to list it as an option.
type FanInConfig ¶
type FanInConfig struct {
Inputs []interface{} `json:"inputs" yaml:"inputs"`
}
FanInConfig - Configuration for the FanIn input type.
func NewFanInConfig ¶
func NewFanInConfig() FanInConfig
NewFanInConfig - Creates a new FanInConfig with default values.
type File ¶
type File struct {
// contains filtered or unexported fields
}
File - An input type that reads lines from a file, creating a message per line.
func (*File) CloseAsync ¶
func (f *File) CloseAsync()
CloseAsync - Shuts down the File input and stops processing requests.
func (*File) MessageChan ¶
MessageChan - Returns the messages channel.
func (*File) StartListening ¶
StartListening - Sets the channel used by the input to validate message receipt.
type FileConfig ¶
type FileConfig struct { Path string `json:"path" yaml:"path"` Multipart bool `json:"multipart" yaml:"multipart"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
FileConfig - Configuration values for the File input type.
func NewFileConfig ¶
func NewFileConfig() FileConfig
NewFileConfig - Creates a new FileConfig with default values.
type HTTPServer ¶
type HTTPServer struct {
// contains filtered or unexported fields
}
HTTPServer - An input type that serves HTTPServer POST requests.
func (*HTTPServer) CloseAsync ¶
func (h *HTTPServer) CloseAsync()
CloseAsync - Shuts down the HTTPServer input and stops processing requests.
func (*HTTPServer) MessageChan ¶
func (h *HTTPServer) MessageChan() <-chan types.Message
MessageChan - Returns the messages channel.
func (*HTTPServer) StartListening ¶
func (h *HTTPServer) StartListening(responses <-chan types.Response) error
StartListening - Sets the channel used by the input to validate message receipt.
func (*HTTPServer) WaitForClose ¶
func (h *HTTPServer) WaitForClose(timeout time.Duration) error
WaitForClose - Blocks until the HTTPServer input has closed down.
type HTTPServerConfig ¶
type HTTPServerConfig struct { Address string `json:"address" yaml:"address"` Path string `json:"path" yaml:"path"` TimeoutMS int64 `json:"timeout_ms" yaml:"timeout_ms"` HighWaterMark int `json:"high_water_mark" yaml:"high_water_mark"` }
HTTPServerConfig - Configuration for the HTTPServer input type.
func NewHTTPServerConfig ¶
func NewHTTPServerConfig() HTTPServerConfig
NewHTTPServerConfig - Creates a new HTTPServerConfig with default values.
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka - An input type that reads from a Kafka instance.
func (*Kafka) CloseAsync ¶
func (k *Kafka) CloseAsync()
CloseAsync - Shuts down the Kafka input and stops processing requests.
func (*Kafka) MessageChan ¶
MessageChan - Returns the messages channel.
func (*Kafka) StartListening ¶
StartListening - Sets the channel used by the input to validate message receipt.
type KafkaConfig ¶
type KafkaConfig struct { Addresses []string `json:"addresses" yaml:"addresses"` ClientID string `json:"client_id" yaml:"client_id"` ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"` Topic string `json:"topic" yaml:"topic"` Partition int32 `json:"partition" yam:"partition"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` }
KafkaConfig - Configuration for the Kafka input type.
func NewKafkaConfig ¶
func NewKafkaConfig() KafkaConfig
NewKafkaConfig - Creates a new KafkaConfig with default values.
type NATS ¶ added in v0.2.2
type NATS struct {
// contains filtered or unexported fields
}
NATS - An input type that receives NATS messages.
func (*NATS) CloseAsync ¶ added in v0.2.2
func (n *NATS) CloseAsync()
CloseAsync - Shuts down the NATS input and stops processing requests.
func (*NATS) MessageChan ¶ added in v0.2.2
MessageChan - Returns the messages channel.
func (*NATS) StartListening ¶ added in v0.2.2
StartListening - Sets the channel used by the input to validate message receipt.
type NATSConfig ¶ added in v0.2.2
type NATSConfig struct { URL string `json:"url" yaml:"url"` Subject string `json:"subject" yaml:"subject"` }
NATSConfig - Configuration for the NATS input type.
func NewNATSConfig ¶ added in v0.2.2
func NewNATSConfig() NATSConfig
NewNATSConfig - Creates a new NATSConfig with default values.
type NSQ ¶ added in v0.1.1
type NSQ struct {
// contains filtered or unexported fields
}
NSQ - An input type that receives NSQ messages.
func (*NSQ) CloseAsync ¶ added in v0.1.1
func (n *NSQ) CloseAsync()
CloseAsync - Shuts down the NSQ input and stops processing requests.
func (*NSQ) HandleMessage ¶ added in v0.1.1
HandleMessage - Handles an NSQ message.
func (*NSQ) MessageChan ¶ added in v0.1.1
MessageChan - Returns the messages channel.
func (*NSQ) StartListening ¶ added in v0.1.1
StartListening - Sets the channel used by the input to validate message receipt.
type NSQConfig ¶ added in v0.1.1
type NSQConfig struct { Addresses []string `json:"nsqd_tcp_addresses" yaml:"nsqd_tcp_addresses"` LookupAddresses []string `json:"lookupd_http_addresses" yaml:"lookupd_http_addresses"` Topic string `json:"topic" yaml:"topic"` Channel string `json:"channel" yaml:"channel"` UserAgent string `json:"user_agent" yaml:"user_agent"` MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` }
NSQConfig - Configuration for the NSQ input type.
func NewNSQConfig ¶ added in v0.1.1
func NewNSQConfig() NSQConfig
NewNSQConfig - Creates a new NSQConfig with default values.
type PipelineConstructor ¶ added in v0.2.9
PipelineConstructor is a func that constructs a unique pipeline.
type STDIN ¶
type STDIN struct {
// contains filtered or unexported fields
}
STDIN - An input type that reads lines from STDIN.
func (*STDIN) CloseAsync ¶
func (s *STDIN) CloseAsync()
CloseAsync - Shuts down the STDIN input and stops processing requests.
func (*STDIN) MessageChan ¶
MessageChan - Returns the messages channel.
func (*STDIN) StartListening ¶
StartListening - Sets the channel used by the input to validate message receipt.
type STDINConfig ¶ added in v0.0.2
type STDINConfig struct { Multipart bool `json:"multipart" yaml:"multipart"` MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` }
STDINConfig - contains config fields for the STDIN input type.
func NewSTDINConfig ¶ added in v0.0.2
func NewSTDINConfig() STDINConfig
NewSTDINConfig - creates a STDINConfig populated with default values.
type ScaleProto ¶
type ScaleProto struct {
// contains filtered or unexported fields
}
ScaleProto - An input type that serves Scalability Protocols messages.
func (*ScaleProto) CloseAsync ¶
func (s *ScaleProto) CloseAsync()
CloseAsync - Shuts down the ScaleProto input and stops processing requests.
func (*ScaleProto) MessageChan ¶
func (s *ScaleProto) MessageChan() <-chan types.Message
MessageChan - Returns the messages channel.
func (*ScaleProto) StartListening ¶
func (s *ScaleProto) StartListening(responses <-chan types.Response) error
StartListening - Sets the channel used by the input to validate message receipt.
func (*ScaleProto) WaitForClose ¶
func (s *ScaleProto) WaitForClose(timeout time.Duration) error
WaitForClose - Blocks until the ScaleProto input has closed down.
type ScaleProtoConfig ¶
type ScaleProtoConfig struct { Address string `json:"address" yaml:"address"` Bind bool `json:"bind_address" yaml:"bind_address"` SocketType string `json:"socket_type" yaml:"socket_type"` SubFilters []string `json:"sub_filters" yaml:"sub_filters"` UseBenthosMulti bool `json:"benthos_multi" yaml:"benthos_multi"` PollTimeoutMS int `json:"poll_timeout_ms" yaml:"poll_timeout_ms"` }
ScaleProtoConfig - Configuration for the ScaleProto input type.
func NewScaleProtoConfig ¶
func NewScaleProtoConfig() ScaleProtoConfig
NewScaleProtoConfig - Creates a new ScaleProtoConfig with default values.
type Type ¶
Type - The standard interface of an input type.
func New ¶ added in v0.0.2
func New( conf Config, log log.Modular, stats metrics.Type, pipelines ...PipelineConstructor, ) (Type, error)
New creates an input type based on an input configuration.
func NewFanIn ¶
func NewFanIn( conf Config, log log.Modular, stats metrics.Type, pipelines ...PipelineConstructor, ) (Type, error)
NewFanIn - Create a new FanIn input type.
func NewHTTPServer ¶
NewHTTPServer - Create a new HTTPServer input type.
func NewScaleProto ¶
NewScaleProto - Create a new ScaleProto input type.
func WrapWithPipelines ¶ added in v0.2.9
func WrapWithPipelines(in Type, pipeConstructors ...PipelineConstructor) (Type, error)
WrapWithPipelines wraps an input with a variadic number of pipelines.
type WithPipeline ¶ added in v0.2.9
type WithPipeline struct {
// contains filtered or unexported fields
}
WithPipeline is a type that wraps both an input type and a pipeline type by routing the input through the pipeline, and implements the input.Type interface in order to act like an ordinary input.
func WrapWithPipeline ¶ added in v0.2.9
func WrapWithPipeline(in Type, pipeConstructor PipelineConstructor) (*WithPipeline, error)
WrapWithPipeline routes an input directly into a processing pipeline and returns a type that manages both and acts like an ordinary input.
func (*WithPipeline) CloseAsync ¶ added in v0.2.9
func (i *WithPipeline) CloseAsync()
CloseAsync triggers a closure of this object but does not block.
func (*WithPipeline) MessageChan ¶ added in v0.2.9
func (i *WithPipeline) MessageChan() <-chan types.Message
MessageChan returns the channel used for consuming messages from this input.
func (*WithPipeline) StartListening ¶ added in v0.2.9
func (i *WithPipeline) StartListening(resChan <-chan types.Response) error
StartListening starts the type listening to a response channel from a consumer.
func (*WithPipeline) WaitForClose ¶ added in v0.2.9
func (i *WithPipeline) WaitForClose(timeout time.Duration) error
WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.