Documentation ¶
Index ¶
- Constants
- func SetGCPercentIfNotSet(percent int)
- type CompressedInput
- func (s *CompressedInput) FreeMem(data *baker.Data)
- func (s *CompressedInput) NoMoreFiles()
- func (s *CompressedInput) ParseFile(fn string)
- func (s *CompressedInput) ProcessFile(fn string) error
- func (s *CompressedInput) SetOutputChannel(data chan<- *baker.Data)
- func (s *CompressedInput) Stats() baker.InputStats
- func (s *CompressedInput) Stop()
- type S3Input
Constants ¶
const ( MetadataLastModified = "last_modified" MetadataURL = "url" )
These keys identify values in the record Metadata cache
Variables ¶
This section is empty.
Functions ¶
func SetGCPercentIfNotSet ¶
func SetGCPercentIfNotSet(percent int)
SetGCPercentIfNotSet sets the GC target percentage, unless GOGC environment variable is set, in which case SetGCPercentIfNotSet doesn't not override it and let it as is.
Types ¶
type CompressedInput ¶
type CompressedInput struct { Opener func(fn string) (io.ReadCloser, int64, time.Time, *url.URL, error) Sizer func(fn string) (int64, error) Done chan bool // contains filtered or unexported fields }
CompressedInput is a base for creating input components that processes multiple gzip or zstd-compressed logs coming from arbitrary sources.
This class implements an internal queue of files (expressed by filenames) and instantiates a number of workers to process them. Subclasses can enqueue a file for processing through compressedInput.ProcessFile()
It must be configured with an Opener function that is able to open a file given its filename and returns a io.ReadCloser instance for that file.
func NewCompressedInput ¶
func (*CompressedInput) FreeMem ¶
func (s *CompressedInput) FreeMem(data *baker.Data)
func (*CompressedInput) NoMoreFiles ¶
func (s *CompressedInput) NoMoreFiles()
Signal compressedInput that we've finished enqueuing files, and it can exit whenever it has finished processing what was already enqueued. This can be used by an input which has a fixed set of files to process.
func (*CompressedInput) ParseFile ¶
func (s *CompressedInput) ParseFile(fn string)
func (*CompressedInput) ProcessFile ¶
func (s *CompressedInput) ProcessFile(fn string) error
Enqueue a file for processing by compressedInput. This function must be called by subclasses to schedule processing a (gzip|zstd) logfile. The function just enqueues the file and exits, so it's normally fast, but might block if the backlog is bigger than internal channel size (default: 1024 files)
func (*CompressedInput) SetOutputChannel ¶
func (s *CompressedInput) SetOutputChannel(data chan<- *baker.Data)
func (*CompressedInput) Stats ¶
func (s *CompressedInput) Stats() baker.InputStats
func (*CompressedInput) Stop ¶
func (s *CompressedInput) Stop()
type S3Input ¶
type S3Input struct { *CompressedInput Bucket string // contains filtered or unexported fields }
func NewS3Input ¶
func (*S3Input) ProcessDirectory ¶
ProcessDirectory enqueues all files matching a specific prefix for processing by s3Input. If prefix is actually a s3 url use the bucket there instead of the one provided at creation time.
This function makes (multiple) remotes call to acquire the listing of all files matching the specified prefix in the bucket, and enqueue them for processing,