stage

package
v0.0.0-...-6eee6bf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var FieldDeleterTransformProvider = TransformProvider{
	StageType: "deleter",
	Instantiator: func(cfg interface{}) *node.Middle {
		c := cfg.(Deleter)
		toDelete := map[string]struct{}{}
		for _, f := range c.Fields {
			toDelete[fmt.Sprint(f)] = struct{}{}
		}
		return node.AsMiddle(func(in <-chan map[string]interface{}, out chan<- map[string]interface{}) {
			for m := range in {
				for td := range toDelete {
					delete(m, td)
				}
				out <- m
			}
		})
	},
}

FieldDeleterTransformProvider receives a map and removes the configured fields from it

View Source
var HttpIngestProvider = IngestProvider{
	StageType: "http",
	Instantiator: func(cfg interface{}) *node.Init {
		c := cfg.(Http)
		port := c.Port
		if port == 0 {
			port = defaultPort
		}
		log := logrus.WithField("component", "HttpIngest")
		return node.AsInit(func(out chan<- []byte) {
			err := http.ListenAndServe(fmt.Sprintf(":%d", port),
				http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
					if request.Method != http.MethodPost {
						writer.WriteHeader(http.StatusBadRequest)
						return
					}
					body, err := ioutil.ReadAll(request.Body)
					if err != nil {
						log.WithError(err).Warn("failed request")
						writer.WriteHeader(http.StatusBadRequest)
						writer.Write([]byte(err.Error()))
						return
					}
					out <- body
				}))
			log.WithError(err).Warn("HTTP server ended")
		})
	},
}

HttpIngestProvider listens for HTTP connections and forwards them. The instantiator needs to receive a stage.Http instance.

View Source
var StdOutExportProvider = ExportProvider{
	StageType: "stdout",
	Instantiator: func(cfg interface{}) *node.Terminal {
		c := cfg.(Stdout)
		return node.AsTerminal(func(in <-chan string) {
			for s := range in {
				fmt.Println(c.Prepend + s)
			}
		})
	},
}

StdOutExportProvider receives any message and prints it, prepending a given message

Functions

func BytesToStringCodec

func BytesToStringCodec(in <-chan []byte, out chan<- string)

func JSONBytesToMapCodec

func JSONBytesToMapCodec(in <-chan []byte, out chan<- map[string]interface{})

func MapToStringCodec

func MapToStringCodec(in <-chan map[string]interface{}, out chan<- string)

Types

type Deleter

type Deleter struct {
	Name   string   `hcl:",label"`
	Fields []string `hcl:"fields"`
}

type ExportProvider

type ExportProvider struct {
	StageType    Type
	Instantiator func(interface{}) *node.Terminal
}

type Http

type Http struct {
	Name string `hcl:",label"`
	Port int    `hcl:"port,optional"`
}

type IngestProvider

type IngestProvider struct {
	StageType    Type
	Instantiator func(interface{}) *node.Init
}

type Name

type Name string

type Stdout

type Stdout struct {
	Name    string `hcl:",label"`
	Prepend string `hcl:"prepend,optional"`
}

type TransformProvider

type TransformProvider struct {
	StageType    Type
	Instantiator func(interface{}) *node.Middle
}

type Type

type Type string

Directories

Path Synopsis
Package system ingests monitoring information from system
Package system ingests monitoring information from system

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL