Documentation
¶
Index ¶
- func CheckIfAttributionCSVFile(filePath string) bool
- func CopyFrom(ctx context.Context, tx *pgx.Tx, scanner *bufio.Scanner, fileID uint, ...) (int, int, error)
- func GetCSVMetadata(path string) (csvFileMetadata, error)
- type CSVFileProcessor
- type CSVImporter
- type CSVParser
- type CclfFileProcessor
- type CclfImporter
- type LocalFileProcessor
- func (processor *LocalFileProcessor) CleanUpCCLF(ctx context.Context, cclfMap map[string][]*cclfZipMetadata) (deletedCount int, err error)
- func (processor *LocalFileProcessor) CleanUpCSV(file csvFile) error
- func (processor *LocalFileProcessor) LoadCSV(filepath string) (*bytes.Reader, func(), error)
- func (processor *LocalFileProcessor) LoadCclfFiles(path string) (cclfList map[string][]*cclfZipMetadata, skipped int, failed int, err error)
- func (processor *LocalFileProcessor) OpenZipArchive(filePath string) (*zip.Reader, func(), error)
- type S3FileProcessor
- func (processor *S3FileProcessor) CleanUpCCLF(ctx context.Context, cclfMap map[string][]*cclfZipMetadata) (deletedCount int, err error)
- func (processor *S3FileProcessor) CleanUpCSV(file csvFile) error
- func (processor *S3FileProcessor) LoadCSV(filepath string) (*bytes.Reader, func(), error)
- func (processor *S3FileProcessor) LoadCclfFiles(path string) (cclfMap map[string][]*cclfZipMetadata, skipped int, failed int, err error)
- func (processor *S3FileProcessor) OpenZipArchive(filePath string) (*zip.Reader, func(), error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CopyFrom ¶
func CopyFrom(ctx context.Context, tx *pgx.Tx, scanner *bufio.Scanner, fileID uint, reportInterval int, logger logrus.FieldLogger, expectedRecordLength int) (int, int, error)
CopyFrom writes all of the beneficiary data captured in the scanner to the beneficiaries table. It returns the number of rows written along with any error that occurred.
func GetCSVMetadata ¶
GetCSVMetadata builds a metadata struct based on the filename parts. The filename regex is part of aco configuration.
Types ¶
type CSVFileProcessor ¶
type CSVFileProcessor interface { // Fetch the csv attribution file to be imported. LoadCSV(path string) (*bytes.Reader, func(), error) // Remove csv attribution file that was successfully imported. CleanUpCSV(file csvFile) (err error) }
FileProcessors for attribution are created as interfaces so that they can be passed in place of the implementation; local development and other envs will require different processors. This interface has two implementations; one for ingesting and testing locally, and one for ingesting in s3.
type CSVImporter ¶
type CSVImporter struct { Logger logrus.FieldLogger FileProcessor CSVFileProcessor Database *sql.DB }
func (CSVImporter) ImportCSV ¶
func (importer CSVImporter) ImportCSV(filepath string) error
func (CSVImporter) ProcessCSV ¶
func (importer CSVImporter) ProcessCSV(csv csvFile) error
ProcessCSV() will take provided metadata and write a new record to the cclf_files table and the contents of the file and write new record(s) to the cclf_beneficiaries table. If any step of writing to the database should fail, the whole transaction will fail. If the new records are written successfully, then the new record in the cclf_files table will have it's import status updated.
type CclfFileProcessor ¶
type CclfFileProcessor interface { // Load a list of valid CCLF files to be imported LoadCclfFiles(path string) (cclfList map[string][]*cclfZipMetadata, skipped int, failed int, err error) // Clean up CCLF files after failed or successful import runs CleanUpCCLF(ctx context.Context, cclfMap map[string][]*cclfZipMetadata) (deletedCount int, err error) // Open a zip archive OpenZipArchive(name string) (*zip.Reader, func(), error) }
Manages the interaction of CCLF files from a given source
type CclfImporter ¶
type CclfImporter struct { Logger logrus.FieldLogger FileProcessor CclfFileProcessor }
Manages the import process for CCLF files from a given source
func (CclfImporter) ImportCCLFDirectory ¶
func (importer CclfImporter) ImportCCLFDirectory(filePath string) (success, failure, skipped int, err error)
type LocalFileProcessor ¶
type LocalFileProcessor struct {
Handler optout.LocalFileHandler
}
func (*LocalFileProcessor) CleanUpCCLF ¶
func (*LocalFileProcessor) CleanUpCSV ¶
func (processor *LocalFileProcessor) CleanUpCSV(file csvFile) error
func (*LocalFileProcessor) LoadCSV ¶
func (processor *LocalFileProcessor) LoadCSV(filepath string) (*bytes.Reader, func(), error)
func (*LocalFileProcessor) LoadCclfFiles ¶
func (*LocalFileProcessor) OpenZipArchive ¶
func (processor *LocalFileProcessor) OpenZipArchive(filePath string) (*zip.Reader, func(), error)
type S3FileProcessor ¶
type S3FileProcessor struct {
Handler optout.S3FileHandler
}
func (*S3FileProcessor) CleanUpCCLF ¶
func (*S3FileProcessor) CleanUpCSV ¶
func (processor *S3FileProcessor) CleanUpCSV(file csvFile) error
func (*S3FileProcessor) LoadCSV ¶
func (processor *S3FileProcessor) LoadCSV(filepath string) (*bytes.Reader, func(), error)
func (*S3FileProcessor) LoadCclfFiles ¶
func (*S3FileProcessor) OpenZipArchive ¶
func (processor *S3FileProcessor) OpenZipArchive(filePath string) (*zip.Reader, func(), error)