cclf

package
v0.0.0-...-addf352 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2025 License: CC0-1.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CheckIfAttributionCSVFile

func CheckIfAttributionCSVFile(filePath string) bool

func CopyFrom

func CopyFrom(ctx context.Context, tx *pgx.Tx, scanner *bufio.Scanner, fileID uint, reportInterval int, logger logrus.FieldLogger, expectedRecordLength int) (int, int, error)

CopyFrom writes all of the beneficiary data captured in the scanner to the beneficiaries table. It returns the number of rows written along with any error that occurred.

func GetCSVMetadata

func GetCSVMetadata(path string) (csvFileMetadata, error)

GetCSVMetadata builds a metadata struct based on the filename parts. The filename regex is part of aco configuration.

Types

type CSVFileProcessor

type CSVFileProcessor interface {
	// Fetch the csv attribution file to be imported.
	LoadCSV(path string) (*bytes.Reader, func(), error)
	// Remove csv attribution file that was successfully imported.
	CleanUpCSV(file csvFile) (err error)
}

FileProcessors for attribution are created as interfaces so that they can be passed in place of the implementation; local development and other envs will require different processors. This interface has two implementations; one for ingesting and testing locally, and one for ingesting in s3.

type CSVImporter

type CSVImporter struct {
	Logger        logrus.FieldLogger
	FileProcessor CSVFileProcessor
	Database      *sql.DB
}

func (CSVImporter) ImportCSV

func (importer CSVImporter) ImportCSV(filepath string) error

func (CSVImporter) ProcessCSV

func (importer CSVImporter) ProcessCSV(csv csvFile) error

ProcessCSV() will take provided metadata and write a new record to the cclf_files table and the contents of the file and write new record(s) to the cclf_beneficiaries table. If any step of writing to the database should fail, the whole transaction will fail. If the new records are written successfully, then the new record in the cclf_files table will have it's import status updated.

type CSVParser

type CSVParser struct {
	FilePath string
}

type CclfFileProcessor

type CclfFileProcessor interface {
	// Load a list of valid CCLF files to be imported
	LoadCclfFiles(path string) (cclfList map[string][]*cclfZipMetadata, skipped int, failed int, err error)
	// Clean up CCLF files after failed or successful import runs
	CleanUpCCLF(ctx context.Context, cclfMap map[string][]*cclfZipMetadata) (deletedCount int, err error)
	// Open a zip archive
	OpenZipArchive(name string) (*zip.Reader, func(), error)
}

Manages the interaction of CCLF files from a given source

type CclfImporter

type CclfImporter struct {
	Logger        logrus.FieldLogger
	FileProcessor CclfFileProcessor
}

Manages the import process for CCLF files from a given source

func (CclfImporter) ImportCCLFDirectory

func (importer CclfImporter) ImportCCLFDirectory(filePath string) (success, failure, skipped int, err error)

type LocalFileProcessor

type LocalFileProcessor struct {
	Handler optout.LocalFileHandler
}

func (*LocalFileProcessor) CleanUpCCLF

func (processor *LocalFileProcessor) CleanUpCCLF(ctx context.Context, cclfMap map[string][]*cclfZipMetadata) (deletedCount int, err error)

func (*LocalFileProcessor) CleanUpCSV

func (processor *LocalFileProcessor) CleanUpCSV(file csvFile) error

func (*LocalFileProcessor) LoadCSV

func (processor *LocalFileProcessor) LoadCSV(filepath string) (*bytes.Reader, func(), error)

func (*LocalFileProcessor) LoadCclfFiles

func (processor *LocalFileProcessor) LoadCclfFiles(path string) (cclfList map[string][]*cclfZipMetadata, skipped int, failed int, err error)

func (*LocalFileProcessor) OpenZipArchive

func (processor *LocalFileProcessor) OpenZipArchive(filePath string) (*zip.Reader, func(), error)

type S3FileProcessor

type S3FileProcessor struct {
	Handler optout.S3FileHandler
}

func (*S3FileProcessor) CleanUpCCLF

func (processor *S3FileProcessor) CleanUpCCLF(ctx context.Context, cclfMap map[string][]*cclfZipMetadata) (deletedCount int, err error)

func (*S3FileProcessor) CleanUpCSV

func (processor *S3FileProcessor) CleanUpCSV(file csvFile) error

func (*S3FileProcessor) LoadCSV

func (processor *S3FileProcessor) LoadCSV(filepath string) (*bytes.Reader, func(), error)

func (*S3FileProcessor) LoadCclfFiles

func (processor *S3FileProcessor) LoadCclfFiles(path string) (cclfMap map[string][]*cclfZipMetadata, skipped int, failed int, err error)

func (*S3FileProcessor) OpenZipArchive

func (processor *S3FileProcessor) OpenZipArchive(filePath string) (*zip.Reader, func(), error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL