processor

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2020 License: AGPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ParsedEventBufferSize is the size of the buffer of the Go channel containing the parsed events.
	// Since there are different goroutines writing and reading from that channel each with different I/O characteristics,
	// we are specifying this buffer to avoid blocking the goroutines that write to the channel if the reader goroutine is
	// temporarily busy. The writer goroutines will block writing but only when the buffer has been full - something we need
	// to avoid using up lot of memory.
	// see also: https://golang.org/doc/effective_go.html#channels
	ParsedEventBufferSize = 1000
)

Functions

func Process

func Process(dataStreams chan *common.DataStream, destination destinations.Destination) error

Process orchestrates the tasks of parsing logs, classification, normalization and forwarding the logs to the appropriate destination. Any errors will cause Lambda invocation to fail

func StreamEvents added in v1.2.0

func StreamEvents(sqsClient sqsiface.SQSAPI, deadlineTime time.Time, event events.SQSEvent) (sqsMessageCount int, err error)

StreamEvents acts as an interface to aggregate sqs messages to avoid many small S3 files being created under load. The function will attempt to read more messages from the queue when the queue has messages. Under load the lambda will continue to read events and maximally aggregate data to produce fewer, bigger files. Fewer, bigger files makes Athena queries much faster.

Types

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor(input *common.DataStream) *Processor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL