Documentation ¶
Index ¶
- Constants
- func CfgAuthOpts(cfg *viper.Viper) *shttp.AuthenticationOpts
- func Main(defaultCfgFile string)
- func NewClassifySubnet(cfg *viper.Viper) (interface{}, error)
- func NewCompressGzip(cfg *viper.Viper) (interface{}, error)
- func NewCompressNone(cfg *viper.Viper) (interface{}, error)
- func NewEncodeCSV(cfg *viper.Viper) (interface{}, error)
- func NewEncodeJSON(cfg *viper.Viper) (interface{}, error)
- func NewFilterSubnet(cfg *viper.Viper) (interface{}, error)
- func NewStoreBuffered(cfg *viper.Viper) (interface{}, error)
- func NewStoreDirect(cfg *viper.Viper) (interface{}, error)
- func NewSubscriber(pipeline *Pipeline, cfg *viper.Viper) (*websocket.StructSpeaker, error)
- func NewTransformNone(cfg *viper.Viper) (interface{}, error)
- func NewWriteS3(cfg *viper.Viper) (interface{}, error)
- func NewWriteStdout(cfg *viper.Viper) (interface{}, error)
- func SubscriberRun(s *websocket.StructSpeaker)
- type Classifier
- type Compressor
- type EncodeCSV
- type EncodeJSON
- type Encoder
- type Filterer
- type Handler
- type HandlersMap
- type Pipeline
- type Storer
- type Tag
- type Transformer
- type Writer
Constants ¶
const CfgRoot = "pipeline."
CfgRoot configuration root path
Variables ¶
This section is empty.
Functions ¶
func CfgAuthOpts ¶
func CfgAuthOpts(cfg *viper.Viper) *shttp.AuthenticationOpts
CfgAuthOpts creates the auth options form configuration
func NewClassifySubnet ¶
NewClassifySubnet returns a new classify, based on the given cluster net masks
func NewCompressGzip ¶
NewCompressGzip create an encode object
func NewCompressNone ¶
NewCompressNone create an encode object
func NewEncodeCSV ¶
NewEncodeCSV creates an encode object
func NewEncodeJSON ¶
NewEncodeJSON creates an encode object
func NewFilterSubnet ¶
NewFilterSubnet returns a new filter based on config
func NewStoreBuffered ¶
NewStoreBuffered returns a new storage interface for storing flows to object store
func NewStoreDirect ¶
NewStoreDirect returns a new storage interface for storing flows to object store
func NewSubscriber ¶
NewSubscriber returns a new flow subscriber writing to object store
func NewTransformNone ¶
NewTransformNone create a new transform
func NewWriteS3 ¶
NewWriteS3 creates a new S3-compatible object storage client
func NewWriteStdout ¶
NewWriteStdout returns a new storage interface for storing flows to object store
func SubscriberRun ¶
func SubscriberRun(s *websocket.StructSpeaker)
SubscriberRun runs the subscriber under main
Types ¶
type Classifier ¶
Classifier exposes the interface for tag based classification
type Compressor ¶
Compressor exposes the interface for compressesing encoded flows
type EncodeJSON ¶
type EncodeJSON struct {
// contains filtered or unexported fields
}
EncodeJSON encoder encodes flows as a JSON array
func (*EncodeJSON) Encode ¶
func (e *EncodeJSON) Encode(in interface{}) ([]byte, error)
Encode implements Encoder interface
type HandlersMap ¶
HandlersMap a map of handlers
var ( TransformerHandlers HandlersMap ClassifierHandlers HandlersMap FiltererHandlers HandlersMap EncoderHandlers HandlersMap CompressorHandlers HandlersMap StorerHandlers HandlersMap WriterHandlers HandlersMap )
Global set of handlers
type Pipeline ¶
type Pipeline struct { sync.Mutex Transformer Transformer Classifier Classifier Filterer Filterer Encoder Encoder Compressor Compressor Storer Storer Writer Writer }
Pipeline manager
func NewPipeline ¶
NewPipeline defines the pipeline elements
func (*Pipeline) OnStructMessage ¶
func (p *Pipeline) OnStructMessage(c ws.Speaker, msg *ws.StructMessage)
OnStructMessage is triggered when WS server sends us a message.
type Transformer ¶
type Transformer interface { // Transform transforms a flow before being stored Transform(f *flow.Flow) interface{} }
Transformer allows generic transformations of a flow