Documentation
¶
Overview ¶
package etlutil has various helper functions used by components of goetl.
Index ¶
- Constants
- func BeginningOfDay(t time.Time) time.Time
- func BeginningOfMonth(t time.Time) time.Time
- func BeginningOfTodayInUTC(loc *time.Location) time.Time
- func CSVProcess(params *CSVParameters, d etldata.Payload, outputChan chan etldata.Payload, ...)
- func CSVString(v interface{}) string
- func CreateTempTable(tx *sql.Tx, likeTable string) (string, error)
- func DaysBetween(start, finish time.Time) int
- func DaysInMonth(month string) int
- func Dedupe(tx *sql.Tx, targetTable string) error
- func DeleteS3Objects(client *s3.S3, bucket string, objKeys []string) (*s3.DeleteObjectsOutput, error)
- func DeltaMerge(tx *sql.Tx, targetTable, tempTable, conditional string) error
- func ExecuteSQLQuery(db *sql.DB, query string) error
- func ExecuteSQLQueryTx(tx *sql.Tx, query string) error
- func FirstDayOfWeek(day time.Time, firstDayOfWeek time.Weekday) time.Time
- func GetDataFromSQLQuery(db *sql.DB, query string, batchSize int, structDest interface{}) (chan etldata.Payload, error)
- func GetS3Object(client *s3.S3, bucket, objKey string) (*s3.GetObjectOutput, error)
- func KillPipelineIfErr(err error, killChan chan error)
- func LastMonth() string
- func ListS3Objects(client *s3.S3, bucket, keyPrefix string) ([]string, error)
- func MonthDateRange(month string) (startDate, endDate string)
- func MonthToDate() (month, startDate, endDate string)
- func MonthToTime(month string) time.Time
- func MonthToTimeInLocation(month string, loc *time.Location) time.Time
- func MonthsAgo(ago int) string
- func MonthsAgoFromYesterday(ago int) string
- func MySQLInsertData(db *sql.DB, d etldata.Payload, tableName string, onDupKeyUpdate bool, ...) error
- func PostgreSQLInsertData(db *sql.DB, d etldata.Payload, tableName string, onDupKeyUpdate bool, ...) error
- func PurgeMerge(tx *sql.Tx, targetTable, tempTable, conditional string) error
- func QuarterToDate() (startDate, endDate string)
- func S3Prefix(table string) string
- func SftpClient(server string, username string, authMethod []ssh.AuthMethod, ...) (*sftp.Client, error)
- func SftpKeyAuth(privateKeyPath string) (auth ssh.AuthMethod, err error)
- func TruncateMerge(tx *sql.Tx, targetTable, tempTable string) error
- func Typecheck(d etldata.Payload) (key string, err error)
- func UUID() (id string, err error)
- func VacuumAll(db *sql.DB) error
- func VacuumTable(db *sql.DB, table string) error
- func WriteS3Object(data []string, config *aws.Config, bucket string, key string, ...) (string, error)
- type CSVParameters
- type CSVWriter
- type SftpParameters
- type SftpPath
- type Timer
- type Typeable
Constants ¶
const ( DateLayout = "2006-01-02" TimeLayout = "15:04:05" )
const (
TypeableColumnName = "goetl_data_type"
)
TypeableColumnName is exposed as a constant to prevent fat fingering.
Variables ¶
This section is empty.
Functions ¶
func BeginningOfDay ¶
BeginningOfDay returns the time (in any location) of the start of that location's day.
func BeginningOfMonth ¶ added in v1.0.7
BeginningOfMonth returns the time (in any location) of the start of that location's month.
func BeginningOfTodayInUTC ¶
BeginningOfTodayInUTC looks at the beginning of the day based on the provided location and then converts it back to UTC.
func CSVProcess ¶
func CSVProcess(params *CSVParameters, d etldata.Payload, outputChan chan etldata.Payload, killChan chan error)
CSVProcess writes the contents to the file and optionally sends the written bytes upstream on outputChan
func CSVString ¶
func CSVString(v interface{}) string
CSVString returns an empty string for nil values to make sure that the text "null" is not written to a file
func CreateTempTable ¶
CreateTempTable generates a unique table name and creates schema based on the target table.
func DaysBetween ¶
DaysBetween calculates how many days pass between two time objects.
func DaysInMonth ¶
DaysInMonth takes a month (in monthLayout) and returns how many days are in that month.
func DeleteS3Objects ¶
func DeleteS3Objects(client *s3.S3, bucket string, objKeys []string) (*s3.DeleteObjectsOutput, error)
DeleteS3Objects deletes the objects specified by the given object keys
func DeltaMerge ¶
DeltaMerge deletes any records in the targetTable that are in the tempTable bound by the conditional. It then inserts all records in the tempTable into the targetTable.
This should be used when you are inserting a subset of records into the tempTable (instead of running a complete snapshot). You would then join based on the primary key, so that all records written to the tempTable will only appear once in the targetTable once the job is complete.
This is effectively a workaround for the lack of primary key constraints in Redshift.
func ExecuteSQLQuery ¶
ExecuteSQLQuery allows you to execute arbitrary SQL statements
func ExecuteSQLQueryTx ¶
ExecuteSQLQueryTx allows you to execute arbitrary SQL statements within a transaction.
func FirstDayOfWeek ¶
FirstDayOfWeek takes a time object and returns the first day of the week of that time object (scoped to firstDayOfWeek).
func GetDataFromSQLQuery ¶
func GetDataFromSQLQuery(db *sql.DB, query string, batchSize int, structDest interface{}) (chan etldata.Payload, error)
GetDataFromSQLQuery is a util function that, given a properly intialized sql.DB and a valid SQL query, will handle executing the query and getting back etldata.JSON objects. This function is asynch, and etldata.JSON should be received on the return data channel. If there was a problem setting up the query, then an error will also be returned immediately. It is also possible for errors to occur during execution as data is retrieved from the query. If this happens, the object returned will be a JSON object in the form of {"Error": "description"}.
func GetS3Object ¶
GetS3Object returns the object output for the given object key
func KillPipelineIfErr ¶
KillPipelineIfErr is an error-checking helper.
func ListS3Objects ¶
ListS3Objects returns all object keys matching the given prefix. Note that delimiter is set to "/". See http://docs.aws.amazon.com/AmazonS3/latest/dev/ListingKeysHierarchy.html
func MonthDateRange ¶
MonthDateRange takes a month (in monthLayout) and returns the first and last day of that month (in DateLayout).
func MonthToDate ¶
func MonthToDate() (month, startDate, endDate string)
MonthToDate starts from yesterday and returns the monthLayout of yesterday, the DateLayout of the first of yesterday's month, and the DateLayout of yesterday.
func MonthToTime ¶ added in v1.0.4
MonthToTime takes a month (in monthLayout) and returns the time object of its first day.
func MonthToTimeInLocation ¶ added in v1.0.5
func MonthsAgoFromYesterday ¶
MonthsAgoFromYesterday defers to MonthsAgo from yesterday's date.
func MySQLInsertData ¶
func MySQLInsertData(db *sql.DB, d etldata.Payload, tableName string, onDupKeyUpdate bool, onDupKeyFields []string, batchSize int) error
MySQLInsertData abstracts building and executing a SQL INSERT statement for the given Data object.
Note that the Data must be a valid JSON object (or an array of valid objects all with the same keys), where the keys are column names and the the values are SQL values to be inserted into those columns.
func PostgreSQLInsertData ¶
func PostgreSQLInsertData(db *sql.DB, d etldata.Payload, tableName string, onDupKeyUpdate bool, onDupKeyIndex string, onDupKeyFields []string, batchSize int) error
PostgreSQLInsertData abstracts building and executing a SQL INSERT statement for the given Data object.
Note that the Data must be a valid JSON object (or an array of valid objects all with the same keys), where the keys are column names and the the values are SQL values to be inserted into those columns.
If onDupKeyUpdate is true, you must set an onDupKeyIndex. This translates to the conflict_target as specified in https://www.postgresql.org/docs/9.5/static/sql-insert.html
func PurgeMerge ¶
PurgeMerge clears out the targetTable based on the conditional, and then writes all records from the tempTable into targetTable. This method is used when a full snapshot of a specific applicationID table is written in into tempTable.
func QuarterToDate ¶
func QuarterToDate() (startDate, endDate string)
QuarterToDate looks at yesterday and returns yesterday's quarterly start date (in DateLayout) and yesterday (in DateLayout.
func SftpClient ¶
func SftpClient(server string, username string, authMethod []ssh.AuthMethod, opts ...sftp.ClientOption) (*sftp.Client, error)
SftpClient sets up and return the client
func SftpKeyAuth ¶
func SftpKeyAuth(privateKeyPath string) (auth ssh.AuthMethod, err error)
SftpKeyAuth generates an ssh.AuthMethod given the path of a private key
func TruncateMerge ¶
TruncateMerge clears out the targetTable and then writes all records from the tempTable into targetTable. This method is used when a full snapshot of the table is written in its entirety into tempTable.
func VacuumTable ¶
VacuumTable vacuums a specific table.
Types ¶
type CSVParameters ¶
type CSVParameters struct { Writer *CSVWriter WriteHeader bool HeaderWritten bool Header []string SendUpstream bool QuoteEscape string Comma rune }
CSVParameters allows you to define all of your csv writing preferences in a single struct for reuse in multiple processors
type CSVWriter ¶
type CSVWriter struct { Comma rune UseCRLF bool AlwaysEncapsulate bool // If the content should be encapsulated independent of its type QuoteEscape string // String to use to escape a quote character // contains filtered or unexported fields }
CSVWriter reimplements the standard library csv.Writer adding AlwaysEncapsulate and QuoteEscape
func NewCSVWriter ¶
func NewCSVWriter() *CSVWriter
NewCSVWriter instantiates a new instance of CSVWriter
func (*CSVWriter) Error ¶
Error reports any error that has occurred during a previous Write or Flush.
func (*CSVWriter) Flush ¶
func (w *CSVWriter) Flush()
Flush writes any buffered data to the underlying io.Writer. To check if an error occurred during the Flush, call Error.
func (*CSVWriter) SetWriter ¶
SetWriter allows you to change the writer (which is not directly exposed)
type SftpParameters ¶
type SftpParameters struct { Server string Username string Path string AuthMethods []ssh.AuthMethod }
SftpParameters is used for storing connection parameters for later executing sftp commands
type SftpPath ¶
type SftpPath struct {
Path string `json:"path,omitempty"`
}
SftpPath is a simple struct for storing the full path of an object
type Timer ¶
type Timer struct {
// contains filtered or unexported fields
}
Timer is a basic mechanism for measuring execution time.
func StartTimer ¶
func StartTimer() (t *Timer)
StartTimer returns a new Timer that's already "started".
func (*Timer) Duration ¶
Duration returns either the total executino duration (if Timer stopped) or the duration until time.Now() if timer is still running.