Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ParsedEventBufferSize is the size of the buffer of the Go channel containing the parsed events. // Since there are different goroutines writing and reading from that channel each with different I/O characteristics, // we are specifying this buffer to avoid blocking the goroutines that write to the channel if the reader goroutine is // temporarily busy. The writer goroutines will block writing but only when the buffer has been full - something we need // to avoid using up lot of memory. // see also: https://golang.org/doc/effective_go.html#channels ParsedEventBufferSize = 1000 )
Functions ¶
func Process ¶
func Process( dataStreams <-chan *common.DataStream, destination destinations.Destination, newProcessor func(stream *common.DataStream) (*Processor, error), ) error
Process orchestrates the tasks of parsing logs, classification, normalization and forwarding the logs to the appropriate destination. Any errors will cause Lambda invocation to fail
func StreamEvents ¶ added in v1.2.0
func StreamEvents( sqsClient sqsiface.SQSAPI, resolver logtypes.Resolver, deadlineTime time.Time, event events.SQSEvent, ) (sqsMessageCount int, err error)
StreamEvents acts as an interface to aggregate sqs messages to avoid many small S3 files being created under load. The function will attempt to read more messages from the queue when the queue has messages. Under load the lambda will continue to read events and maximally aggregate data to produce fewer, bigger files. Fewer, bigger files makes Athena queries much faster.
Types ¶
type Factory ¶ added in v1.11.0
type Factory func(r *common.DataStream) (*Processor, error)
func NewFactory ¶ added in v1.11.0
type ProcessFunc ¶ added in v1.11.0
type ProcessFunc func(streamCh <-chan *common.DataStream, dest destinations.Destination) error
Click to show internal directories.
Click to hide internal directories.