Documentation
¶
Index ¶
- type AMQPFeeder
- func (f *AMQPFeeder) NewConsumer(amqpURI string, exchanges []string, exchangeType, queueName, key, ctag string, ...) (*Consumer, error)
- func (f *AMQPFeeder) NewConsumerWithReconnector(amqpURI string, exchanges []string, exchangeType, queueName, key, ctag string, ...) (*Consumer, error)
- func (f *AMQPFeeder) Run(out chan observation.InputObservation) error
- func (f *AMQPFeeder) SetInputDecoder(fn format.MakeObservationFunc)
- func (f *AMQPFeeder) Stop(stopChan chan bool)
- type Consumer
- type Feeder
- type HTTPFeeder
- type Setup
- type SocketFeeder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AMQPFeeder ¶
type AMQPFeeder struct { StopChan chan bool StoppedChan chan bool IsRunning bool Consumer *Consumer URL string Exchanges []string Queue string MakeObservationFunc format.MakeObservationFunc }
AMQPFeeder is a Feeder that accepts input via AMQP queues.
func MakeAMQPFeeder ¶
func MakeAMQPFeeder(url string, exchanges []string, queue string) *AMQPFeeder
MakeAMQPFeeder returns a new AMQPFeeder, connecting to the AMQP server at the given URL, creating a new queue with the given name bound to the provided exchanges.
func (*AMQPFeeder) NewConsumer ¶
func (f *AMQPFeeder) NewConsumer(amqpURI string, exchanges []string, exchangeType, queueName, key, ctag string, out chan observation.InputObservation) (*Consumer, error)
NewConsumer returns a new Consumer.
func (*AMQPFeeder) NewConsumerWithReconnector ¶
func (f *AMQPFeeder) NewConsumerWithReconnector(amqpURI string, exchanges []string, exchangeType, queueName, key, ctag string, out chan observation.InputObservation, reconnector func(string) (wabbit.Conn, string, error)) (*Consumer, error)
NewConsumerWithReconnector creates a new consumer with the given properties. The callback function is called for each delivery accepted from a consumer channel.
func (*AMQPFeeder) Run ¶
func (f *AMQPFeeder) Run(out chan observation.InputObservation) error
Run starts the feeder.
func (*AMQPFeeder) SetInputDecoder ¶
func (f *AMQPFeeder) SetInputDecoder(fn format.MakeObservationFunc)
SetInputDecoder states that the given MakeObservationFunc should be used to parse and decode data delivered to this feeder.
func (*AMQPFeeder) Stop ¶
func (f *AMQPFeeder) Stop(stopChan chan bool)
Stop causes the feeder to stop accepting deliveries and close all associated channels, including the passed notification channel.
type Consumer ¶
type Consumer struct { URL string Callback func(wabbit.Delivery) StopReconnection chan bool ChanMutex sync.Mutex ConnMutex sync.Mutex OutChan chan observation.InputObservation MakeObservationFunc format.MakeObservationFunc ErrorChan chan wabbit.Error Reconnector func(string) (wabbit.Conn, string, error) Connector func(*Consumer) error // contains filtered or unexported fields }
Consumer reads and processes messages from a fake RabbitMQ server.
type Feeder ¶
type Feeder interface { Run(chan observation.InputObservation) error SetInputDecoder(format.MakeObservationFunc) Stop(chan bool) }
Feeder is an interface of a component that accepts observations in a specific format and feeds them into a channel of InputObservations. An input decoder in the form of a MakeObservationFunc describes the operations necessary to transform the input format into an InputObservation.
type HTTPFeeder ¶
type HTTPFeeder struct { StopChan chan bool StoppedChan chan bool IsRunning bool Port int Host string MakeObservationFunc format.MakeObservationFunc Server *http.Server OutChan chan observation.InputObservation }
HTTPFeeder is a Feeder implementation that accepts HTTP requests to obtain observations.
func MakeHTTPFeeder ¶
func MakeHTTPFeeder(host string, port int) *HTTPFeeder
MakeHTTPFeeder creates a new HTTPFeeder listening on a specific address and port.
func (*HTTPFeeder) Run ¶
func (f *HTTPFeeder) Run(out chan observation.InputObservation) error
Run starts the feeder.
func (*HTTPFeeder) ServeHTTP ¶
func (f *HTTPFeeder) ServeHTTP(w http.ResponseWriter, r *http.Request)
func (*HTTPFeeder) SetInputDecoder ¶
func (f *HTTPFeeder) SetInputDecoder(fn format.MakeObservationFunc)
SetInputDecoder states that the given MakeObservationFunc should be used to parse and decode data delivered to this feeder.
func (*HTTPFeeder) Stop ¶
func (f *HTTPFeeder) Stop(stopChan chan bool)
Stop causes the feeder to stop accepting requests and close all associated channels, including the passed notification channel.
type Setup ¶
type Setup struct { Feeder []struct { Name string `yaml:"name"` Type string `yaml:"type"` InputFormat string `yaml:"input_format"` // for AMQP URL string `yaml:"url"` Exchange []string `yaml:"exchange"` // for HTTP etc. ListenHost string `yaml:"listen_host"` ListenPort int `yaml:"listen_port"` // for socket input Path string `yaml:"path"` } `yaml:"feeder"` Feeders map[string]Feeder }
Setup describes a collection of feeders that should be active, including their configuration settings.
func (*Setup) Run ¶
func (fs *Setup) Run(in chan observation.InputObservation) error
Run starts all feeders according to the description in the setup, in the background. Use Stop() to stop the feeders.
type SocketFeeder ¶
type SocketFeeder struct { ObsChan chan observation.InputObservation Verbose bool Running bool InputListener net.Listener MakeObservationFunc format.MakeObservationFunc StopChan chan bool StoppedChan chan bool }
SocketFeeder is a Feeder implementation that reds data from a UNIX socket.
func MakeSocketFeeder ¶
func MakeSocketFeeder(inputSocket string) (*SocketFeeder, error)
MakeSocketFeeder returns a new SocketFeeder reading from the Unix socket inputSocket and writing parsed events to outChan. If no such socket could be created for listening, the error returned is set accordingly.
func (*SocketFeeder) Run ¶
func (sf *SocketFeeder) Run(out chan observation.InputObservation) error
Run starts the feeder.
func (*SocketFeeder) SetInputDecoder ¶
func (sf *SocketFeeder) SetInputDecoder(fn format.MakeObservationFunc)
SetInputDecoder states that the given MakeObservationFunc should be used to parse and decode data delivered to this feeder.
func (*SocketFeeder) Stop ¶
func (sf *SocketFeeder) Stop(stoppedChan chan bool)
Stop causes the SocketFeeder to stop reading from the socket and close all associated channels, including the passed notification channel.