subimporter

package
v0.7.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2020 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Overview

Package subimporter implements a bulk ZIP/CSV importer of subscribers. It implements a simple queue for buffering imports and committing records to DB along with ZIP and CSV handling utilities. It is meant to be used as a singleton as each Importer instance is stateful, where it keeps track of an import in progress. Only one import should happen on a single importer instance at a time.

Index

Constants

View Source
const (
	StatusNone      = "none"
	StatusImporting = "importing"
	StatusStopping  = "stopping"
	StatusFinished  = "finished"
	StatusFailed    = "failed"

	ModeSubscribe = "subscribe"
	ModeBlocklist = "blocklist"
)

Various import statuses.

Variables

View Source
var (
	// ErrIsImporting is thrown when an import request is made while an
	// import is already running.
	ErrIsImporting = errors.New("import is already running")
)

Functions

func IsEmail

func IsEmail(email string) bool

IsEmail checks whether the given string is a valid e-mail address.

func ValidateFields

func ValidateFields(s SubReq) error

ValidateFields validates incoming subscriber field values.

Types

type Importer

type Importer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Importer represents the bulk CSV subscriber import system.

func New

func New(opt Options, db *sql.DB) *Importer

New returns a new instance of Importer.

func (*Importer) GetLogs

func (im *Importer) GetLogs() []byte

GetLogs returns the log entries of the last import session.

func (*Importer) GetStats

func (im *Importer) GetStats() Status

GetStats returns the global Stats of the importer.

func (*Importer) NewSession

func (im *Importer) NewSession(fName, mode string, overWrite bool, listIDs []int) (*Session, error)

NewSession returns an new instance of Session. It takes the name of the uploaded file, but doesn't do anything with it but retains it for stats.

func (*Importer) Stop

func (im *Importer) Stop()

Stop sends a signal to stop the existing import.

type Options

type Options struct {
	UpsertStmt         *sql.Stmt
	BlocklistStmt      *sql.Stmt
	UpdateListDateStmt *sql.Stmt
	NotifCB            models.AdminNotifCallback
}

Options represents inport options.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session represents a single import session.

func (*Session) ExtractZIP

func (s *Session) ExtractZIP(srcPath string, maxCSVs int) (string, []string, error)

ExtractZIP takes a ZIP file's path and extracts all .csv files in it to a temporary directory, and returns the name of the temp directory and the list of extracted .csv files.

func (*Session) LoadCSV

func (s *Session) LoadCSV(srcPath string, delim rune) error

LoadCSV loads a CSV file and validates and imports the subscriber entries in it.

func (*Session) Start

func (s *Session) Start()

Start is a blocking function that selects on a channel queue until all subscriber entries in the import session are imported. It should be invoked as a goroutine.

func (*Session) Stop

func (s *Session) Stop()

Stop stops an active import session.

type Status

type Status struct {
	Name     string `json:"name"`
	Total    int    `json:"total"`
	Imported int    `json:"imported"`
	Status   string `json:"status"`
	// contains filtered or unexported fields
}

Status reporesents statistics from an ongoing import session.

type SubReq

type SubReq struct {
	models.Subscriber
	Lists     pq.Int64Array  `json:"lists"`
	ListUUIDs pq.StringArray `json:"list_uuids"`
}

SubReq is a wrapper over the Subscriber model.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL