Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ParsedEventBufferSize is the size of the buffer of the Go channel containing the parsed events. // Since there are different goroutines writing and reading from that channel each with different I/O characteristics, // we are specifying this buffer to avoid blocking the goroutines that write to the channel if the reader goroutine is // temporarily busy. The writer goroutines will block writing but only when the buffer has been full - something we need // to avoid using up lot of memory. // see also: https://golang.org/doc/effective_go.html#channels ParsedEventBufferSize = 1000 )
Functions ¶
func Process ¶
func Process(dataStreams chan *common.DataStream, destination destinations.Destination) error
Process orchestrates the tasks of parsing logs, classification, normalization and forwarding the logs to the appropriate destination. Any errors will cause Lambda invocation to fail
func StreamEvents ¶ added in v1.2.0
func StreamEvents(sqsClient sqsiface.SQSAPI, deadlineTime time.Time, event events.SQSEvent) (sqsMessageCount int, err error)
StreamEvents acts as an interface to aggregate sqs messages to avoid many small S3 files being created under load. The function will attempt to read more messages from the queue when the queue has messages. Under load the lambda will continue to read events and maximally aggregate data to produce fewer, bigger files. Fewer, bigger files makes Athena queries much faster.
Types ¶
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
func NewProcessor ¶
func NewProcessor(input *common.DataStream) *Processor
Click to show internal directories.
Click to hide internal directories.