Documentation ¶
Index ¶
- Variables
- func PollEvents(ctx context.Context, sqsClient sqsiface.SQSAPI, ...) (sqsMessageCount int, err error)
- func Process(ctx context.Context, dataStreams <-chan *common.DataStream, ...) error
- func RunScalingDecisions(ctx context.Context, sqsClient sqsiface.SQSAPI, ...)
- type Factory
- type ProcessFunc
- type Processor
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ParsedEventBufferSize is the size of the buffer of the Go channel containing the parsed events. // Since there are different goroutines writing and reading from that channel each with different I/O characteristics, // we are specifying this buffer to avoid blocking the goroutines that write to the channel if the reader goroutine is // temporarily busy. The writer goroutines will block writing but only when the buffer has been full - something we need // to avoid using up lot of memory. // see also: https://golang.org/doc/effective_go.html#channels ParsedEventBufferSize = 1000 )
Functions ¶
func PollEvents ¶ added in v1.13.0
func PollEvents( ctx context.Context, sqsClient sqsiface.SQSAPI, resolver pantherlog.ParserResolver, ) (sqsMessageCount int, err error)
PollEvents acts as an interface to aggregate sqs messages to avoid many small S3 files being created under load. The function will attempt to read more messages from the queue when the queue has messages. Under load the lambda will continue to read events and maximally aggregate data to produce fewer, bigger files. Fewer, bigger files makes Athena queries much faster.
func Process ¶
func Process( ctx context.Context, dataStreams <-chan *common.DataStream, destination destinations.Destination, newProcessor func(stream *common.DataStream) (*Processor, error), ) error
Process orchestrates the tasks of parsing logs, classification, normalization and forwarding the logs to the appropriate destination. Any errors will cause Lambda invocation to fail
func RunScalingDecisions ¶ added in v1.13.0
func RunScalingDecisions(ctx context.Context, sqsClient sqsiface.SQSAPI, lambdaClient lambdaiface.LambdaAPI, interval time.Duration)
RunScalingDecisions makes periodic adaptive decisions to scale up based on the sqs queue stats
Types ¶
type Factory ¶ added in v1.11.0
type Factory func(r *common.DataStream) (*Processor, error)
func NewFactory ¶ added in v1.11.0
func NewFactory(resolver pantherlog.ParserResolver) Factory
type ProcessFunc ¶ added in v1.11.0
type ProcessFunc func(streamCh <-chan *common.DataStream, dest destinations.Destination) error
Click to show internal directories.
Click to hide internal directories.