Documentation ¶
Index ¶
- func HandleMergeRequest(ctx context.Context, event events.SQSEvent) (string, error)
- type FileProcessor
- type RecordMerger
- type RecordTracker
- func (t *RecordTracker) GetRecord(file *data.FileToProcessInfo) (*TrackRecord, error)
- func (t *RecordTracker) MarkFileAsDone(file *data.FileToProcessInfo) (bool, error)
- func (t *RecordTracker) MarkFileAsFailed(file *data.FileToProcessInfo) (bool, error)
- func (t *RecordTracker) MarkFileAsProcessing(file *data.FileToProcessInfo) int
- type TrackRecord
- type WriterInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type FileProcessor ¶
type FileProcessor struct {
// contains filtered or unexported fields
}
FileProcessor is a worker go routine that will process files out of the "filesToProcess" channel.
func MakeFileProcessor ¶
func MakeFileProcessor(region string, filesToProcess <-chan *data.FileToProcessInfo, recordChannel chan<- *data.LogToProcess, returnToSQSChannel chan<- *data.FileToProcessInfo, fileProcessorsWG *sync.WaitGroup, recordTracker *RecordTracker) *FileProcessor
MakeFileProcessor creates a new FileProcessor and returns it.
func (*FileProcessor) Run ¶
func (fp *FileProcessor) Run()
Run starts the FileProcessor loop, reading files off the channel
type RecordMerger ¶
type RecordMerger struct {
// contains filtered or unexported fields
}
RecordMerger merges as many records as possible into a single file over the period of time that it's processing. Once it has enough data to form a complete file it'll write that file to S3 and begin building a new file.
func MakeRecordMerger ¶
func MakeRecordMerger(forHour int, deadline time.Time, writeToBucket string, writeToPath string, records <-chan *data.LogToProcess, recordTracker *RecordTracker) (*RecordMerger, *sync.WaitGroup)
MakeRecordMerger creates a RecordMerger worker and returns it.
type RecordTracker ¶
type RecordTracker struct {
// contains filtered or unexported fields
}
func (*RecordTracker) GetRecord ¶
func (t *RecordTracker) GetRecord(file *data.FileToProcessInfo) (*TrackRecord, error)
func (*RecordTracker) MarkFileAsDone ¶
func (t *RecordTracker) MarkFileAsDone(file *data.FileToProcessInfo) (bool, error)
func (*RecordTracker) MarkFileAsFailed ¶
func (t *RecordTracker) MarkFileAsFailed(file *data.FileToProcessInfo) (bool, error)
func (*RecordTracker) MarkFileAsProcessing ¶
func (t *RecordTracker) MarkFileAsProcessing(file *data.FileToProcessInfo) int
MarkFileAsProcessing will create the tracking record (if needed), and mark it as processing. Returns an indicator of the success or failure of doing this.
Return:
-1 => Failure creating/updating the tracking record, this file should be attempted again later 0 => File already processed, this file should be ignored 1 => Record created/updated successfully, it is safe to process this file
type TrackRecord ¶
type WriterInfo ¶
type WriterInfo struct { RecordMap map[string]*data.LogToProcess Timestamp time.Time NumRecords int64 WritableRecords int64 // contains filtered or unexported fields }
WriterInfo is used for the writer channel