Documentation ¶
Overview ¶
Example (Applier) ¶
package main import ( "context" "fmt" "github.com/jshlbrd/substation/config" "github.com/jshlbrd/substation/process" ) func main() { // copies the value of key "foo" into key "baz" cfg := config.Config{ Type: "copy", Settings: map[string]interface{}{ "key": "foo", "set_key": "bar", }, } // applier is retrieved from the factory applier, err := process.NewApplier(cfg) if err != nil { // handle err panic(err) } // applier is applied to capsule capsule := config.NewCapsule() capsule.SetData([]byte(`{"foo":"fizz"}`)) capsule, err = applier.Apply(context.TODO(), capsule) if err != nil { // handle err panic(err) } fmt.Println(string(capsule.Data())) }
Output: {"foo":"fizz","bar":"fizz"}
Example (Batcher) ¶
package main import ( "context" "fmt" "github.com/jshlbrd/substation/config" "github.com/jshlbrd/substation/process" ) func main() { // copies the value of key "foo" into key "bar" cfg := config.Config{ Type: "copy", Settings: map[string]interface{}{ "key": "foo", "set_key": "bar", }, } // batcher is retrieved from the factory batcher, err := process.NewBatcher(cfg) if err != nil { // handle err panic(err) } // batcher is applied to capsules var capsules []config.Capsule capsule := config.NewCapsule() // {"foo":"fizz","bar":"buzz"} for i := 1; i < 3; i++ { _ = capsule.Set("foo", "fizz") _ = capsule.Set("bar", "buzz") capsules = append(capsules, capsule) } capsules, err = batcher.Batch(context.TODO(), capsules...) if err != nil { // handle err panic(err) } for _, c := range capsules { fmt.Println(string(c.Data())) } }
Output: {"foo":"fizz","bar":"fizz"} {"foo":"fizz","bar":"fizz"}
Example (DNS) ¶
package main import ( "context" "fmt" "github.com/jshlbrd/substation/config" "github.com/jshlbrd/substation/process" ) func main() { capsule := config.NewCapsule() capsule.SetData([]byte(`{"addr":"8.8.8.8"}`)) // apply a reverse_lookup DNS query to addr cfg := []config.Config{ { Type: "dns", Settings: map[string]interface{}{ "key": "addr", "set_key": "domains", "options": map[string]interface{}{ "function": "reverse_lookup", }, }, }, } appliers, err := process.NewAppliers(cfg...) if err != nil { // handle err panic(err) } //nolint: errcheck // errors are ignored in case processing fails in a single applier defer process.CloseAppliers(context.TODO(), appliers...) for _, app := range appliers { capsule, err = app.Apply(context.TODO(), capsule) if err != nil { // handle err panic(err) } } fmt.Println(string(capsule.Data())) }
Output:
Example (HTTP) ¶
package main import ( "context" "fmt" "github.com/jshlbrd/substation/config" "github.com/jshlbrd/substation/process" ) func main() { capsule := config.NewCapsule() capsule.SetData([]byte(`{"addr":"8.8.8.8"}`)) // enriches the IP address by querying the GreyNoise Community API. // authenticating to GreyNoise is accomplished by interpolating a // secret inside an HTTP header. refer to the GreyNoise documentation // for more information: // https://docs.greynoise.io/reference/get_v3-community-ip. cfg := []config.Config{ { Type: "http", Settings: map[string]interface{}{ "key": "addr", // the HTTP response body is written to this key "set_key": "greynoise", "options": map[string]interface{}{ "method": "get", // the value from "addr" is interpolated into ${data} "url": "https://api.greynoise.io/v3/community/${data}", "headers": []map[string]interface{}{ { "key": "key", // this secret must be stored in the environment // variable GREYNOISE_API "value": "${SECRETS_ENV:GREYNOISE_API}", }, }, }, }, }, } appliers, err := process.NewAppliers(cfg...) if err != nil { // handle err panic(err) } //nolint: errcheck // errors are ignored in case processing fails in a single applier defer process.CloseAppliers(context.TODO(), appliers...) for _, app := range appliers { capsule, err = app.Apply(context.TODO(), capsule) if err != nil { // handle err panic(err) } } // sample output (which may change day to day) // {"addr":"8.8.8.8","greynoise":{"ip":"8.8.8.8","noise":false,"riot":true,"classification":"benign","name":"Google Public DNS","link":"https://viz.greynoise.io/riot/8.8.8.8","last_seen":"2023-01-30","message":"Success"}} fmt.Println(string(capsule.Data())) }
Output:
Example (IPDatabase) ¶
package main import ( "context" "fmt" "github.com/jshlbrd/substation/config" "github.com/jshlbrd/substation/process" ) func main() { capsule := config.NewCapsule() capsule.SetData([]byte(`{"addr":"8.8.8.8"}`)) // lookup addr in MaxMind City database cfg := []config.Config{ { Type: "ip_database", Settings: map[string]interface{}{ "key": "addr", "set_key": "geo", "options": map[string]interface{}{ "type": "maxmind_city", "settings": map[string]interface{}{ // the location of the IP enrichment database can // be either a path on local disk, an HTTP(S) URL, // or an AWS S3 URL "database": "location://path/to/maxmind.mmdb", "language": "en", }, }, }, }, } appliers, err := process.NewAppliers(cfg...) if err != nil { // handle err panic(err) } //nolint: errcheck // errors are ignored in case processing fails in a single applier defer process.CloseAppliers(context.TODO(), appliers...) for _, app := range appliers { capsule, err = app.Apply(context.TODO(), capsule) if err != nil { // handle err panic(err) } } fmt.Println(string(capsule.Data())) }
Output:
Example (KVStore) ¶
package main import ( "context" "fmt" "github.com/jshlbrd/substation/config" "github.com/jshlbrd/substation/process" ) func main() { capsule := config.NewCapsule() capsule.SetData([]byte(`{"foo":"bar"}`)) // the value from key "foo" is first set into the KV store and // then retrieved from the KV store and set into key "baz". if // the KV options are identical across processors, then the same // KV store is used in each call. this also allows for the use of // multiple KV stores. cfg := []config.Config{ { Type: "kv_store", Settings: map[string]interface{}{ "key": "foo", "set_key": "setter", "options": map[string]interface{}{ "type": "set", "kv_options": map[string]interface{}{ "type": "memory", "settings": map[string]interface{}{ "capacitiy": 10, }, }, }, }, }, { Type: "kv_store", Settings: map[string]interface{}{ "key": "setter", "set_key": "baz", "options": map[string]interface{}{ "type": "get", "kv_options": map[string]interface{}{ "type": "memory", "settings": map[string]interface{}{ "capacitiy": 10, }, }, }, }, }, } appliers, err := process.NewAppliers(cfg...) if err != nil { // handle err panic(err) } //nolint: errcheck // errors are ignored in case processing fails in a single applier defer process.CloseAppliers(context.TODO(), appliers...) for _, app := range appliers { capsule, err = app.Apply(context.TODO(), capsule) if err != nil { // handle err panic(err) } } fmt.Println(string(capsule.Data())) }
Output:
Index ¶
- func Apply(ctx context.Context, capsule config.Capsule, appliers ...Applier) (config.Capsule, error)
- func ApplyBytes(ctx context.Context, data []byte, appliers ...Applier) ([]byte, error)
- func Batch(ctx context.Context, batch []config.Capsule, batchers ...Batcher) ([]config.Capsule, error)
- func BatchBytes(ctx context.Context, data [][]byte, batchers ...Batcher) ([][]byte, error)
- func CloseAppliers(ctx context.Context, appliers ...Applier) error
- func CloseBatchers(ctx context.Context, batchers ...Batcher) error
- type Applier
- type Batcher
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Apply ¶
func Apply(ctx context.Context, capsule config.Capsule, appliers ...Applier) (config.Capsule, error)
Apply applies processors in series to encapsulated data.
func ApplyBytes ¶
ApplyBytes is a convenience function for applying processors in series to bytes.
Example ¶
package main import ( "context" "fmt" "github.com/jshlbrd/substation/config" "github.com/jshlbrd/substation/process" ) func main() { // copies the value of key "foo" into key "bar" cfg := config.Config{ Type: "copy", Settings: map[string]interface{}{ "key": "foo", "set_key": "bar", }, } // applier is retrieved from the factory applier, err := process.NewApplier(cfg) if err != nil { // handle err panic(err) } // applier is applied to bytes b := []byte(`{"foo":"fizz"}`) b, err = process.ApplyBytes(context.TODO(), b, applier) if err != nil { // handle err panic(err) } fmt.Println(string(b)) }
Output: {"foo":"fizz","bar":"fizz"}
func Batch ¶
func Batch(ctx context.Context, batch []config.Capsule, batchers ...Batcher) ([]config.Capsule, error)
Batch accepts one or more batchers and applies processors in series to encapsulated data.
func BatchBytes ¶
BatchBytes is a convenience function for applying processors in series to bytes.
Example ¶
package main import ( "context" "fmt" "github.com/jshlbrd/substation/config" "github.com/jshlbrd/substation/process" ) func main() { // copies the value of key "foo" into key "bar" cfg := config.Config{ Type: "copy", Settings: map[string]interface{}{ "key": "foo", "set_key": "bar", }, } // batcher is retrieved from the factory batcher, err := process.NewBatcher(cfg) if err != nil { // handle err panic(err) } // applier is applied to slice of bytes b := [][]byte{[]byte(`{"foo":"fizz"}`)} b, err = process.BatchBytes(context.TODO(), b, batcher) if err != nil { // handle err panic(err) } for _, e := range b { fmt.Println(string(e)) } }
Output: {"foo":"fizz","bar":"fizz"}
func CloseAppliers ¶
CloseAppliers closes all appliers and returns an error if any close fails.
Types ¶
type Applier ¶
type Applier interface { Apply(context.Context, config.Capsule) (config.Capsule, error) Close(context.Context) error }
func NewApplier ¶
NewApplier returns a configured Applier from a processor configuration.
Example ¶
package main import ( "fmt" "github.com/jshlbrd/substation/config" "github.com/jshlbrd/substation/process" ) func main() { // copies the value of key "foo" into key "bar" cfg := config.Config{ Type: "copy", Settings: map[string]interface{}{ "key": "foo", "set_key": "bar", }, } // applier is retrieved from the factory applier, err := process.NewApplier(cfg) if err != nil { // handle err panic(err) } fmt.Println(applier) }
Output: {"condition":{"operator":"","inspectors":null},"key":"foo","set_key":"bar","ignore_close":false,"ignore_errors":false}
func NewAppliers ¶
NewAppliers accepts one or more processor configurations and returns configured appliers.
Example ¶
package main import ( "fmt" "github.com/jshlbrd/substation/config" "github.com/jshlbrd/substation/process" ) func main() { // copies the value of key "foo" into key "bar" cfg := config.Config{ Type: "copy", Settings: map[string]interface{}{ "key": "foo", "set_key": "bar", }, } // one or more appliers are created appliers, err := process.NewAppliers(cfg) if err != nil { // handle err panic(err) } for _, app := range appliers { fmt.Println(app) } }
Output: {"condition":{"operator":"","inspectors":null},"key":"foo","set_key":"bar","ignore_close":false,"ignore_errors":false}
type Batcher ¶
type Batcher interface { Batch(context.Context, ...config.Capsule) ([]config.Capsule, error) Close(context.Context) error }
func NewBatcher ¶
NewBatcher returns a configured Batcher from a processor configuration.
Example ¶
package main import ( "fmt" "github.com/jshlbrd/substation/config" "github.com/jshlbrd/substation/process" ) func main() { // copies the value of key "foo" into key "bar" cfg := config.Config{ Type: "copy", Settings: map[string]interface{}{ "key": "foo", "set_key": "bar", }, } // one or more appliers are created batcher, err := process.NewBatcher(cfg) if err != nil { // handle err panic(err) } fmt.Println(batcher) }
Output: {"condition":{"operator":"","inspectors":null},"key":"foo","set_key":"bar","ignore_close":false,"ignore_errors":false}
func NewBatchers ¶
NewBatchers accepts one or more processor configurations and returns configured batchers.
Example ¶
package main import ( "fmt" "github.com/jshlbrd/substation/config" "github.com/jshlbrd/substation/process" ) func main() { // copies the value of key "foo" into key "bar" cfg := config.Config{ Type: "copy", Settings: map[string]interface{}{ "key": "foo", "set_key": "bar", }, } // one or more batchers are created batchers, err := process.NewBatchers(cfg) if err != nil { // handle err panic(err) } for _, bat := range batchers { fmt.Println(bat) } }
Output: {"condition":{"operator":"","inspectors":null},"key":"foo","set_key":"bar","ignore_close":false,"ignore_errors":false}
Source Files ¶
- aggregate.go
- aws_dynamodb.go
- aws_lambda.go
- base64.go
- capture.go
- case.go
- convert.go
- copy.go
- count.go
- delete.go
- dns.go
- domain.go
- drop.go
- expand.go
- flatten.go
- for_each.go
- group.go
- gzip.go
- hash.go
- http.go
- insert.go
- ip_database.go
- join.go
- kv_store.go
- math.go
- pipeline.go
- pretty_print.go
- process.go
- replace.go
- split.go
- time.go