processors

package
v2.0.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2016 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package processors contains built-in DataProcessor implementations that are generic and potentially useful across any ETL project.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BigQueryConfig

type BigQueryConfig struct {
	JsonPemPath string
	ProjectID   string
	DatasetID   string
}

BigQueryConfig is used when init'ing new BigQueryReader instances.

type BigQueryReader

type BigQueryReader struct {
	PageSize         int    // defaults to 5000
	AggregateResults bool   // determines whether to send data as soon as available or to aggregate and send all query results, defaults to false
	UnflattenResults bool   // defaults to false
	TmpTableName     string // Used when UnflattenResults is true. default to "_ratchet_tmp"
	ConcurrencyLevel int    // See ConcurrentDataProcessor
	// contains filtered or unexported fields
}

BigQueryReader is used to query data from Google's BigQuery, and it behaves similarly to SQLReader. See SQLReader docs for explanation on static vs dynamic querying.

Note: If your data set contains nested/repeated fields you will likely want to get results back "unflattened." By default BigQuery returns results in a flattened format, which duplicates rows for each repeated value. This can be annoying to deal with, so BigQueryReader provides a "UnflattenResults" flag that will handle querying in such a way to get back unflattened results. This involves using a temporary table setting and a couple of other special query settings - read the BigQuery docs related to flatten and repeated fields for more info.

func NewBigQueryReader

func NewBigQueryReader(config *BigQueryConfig, query string) *BigQueryReader

NewBigQueryReader returns an instance of a BigQueryExtractor ready to run a static query.

func NewDynamicBigQueryReader

func NewDynamicBigQueryReader(config *BigQueryConfig, sqlGenerator func(data.JSON) (string, error)) *BigQueryReader

NewDynamicBigQueryReader returns an instance of a BigQueryExtractor ready to run a dynamic query based on the sqlGenerator function.

func (*BigQueryReader) Concurrency

func (r *BigQueryReader) Concurrency() int

See ConcurrentDataProcessor

func (*BigQueryReader) Finish

func (r *BigQueryReader) Finish(outputChan chan data.JSON, killChan chan error)

func (*BigQueryReader) ForEachQueryData

func (r *BigQueryReader) ForEachQueryData(d data.JSON, killChan chan error, forEach func(d data.JSON))

ForEachQueryData handles generating the SQL (in case of dynamic mode), running the query and retrieving the data in data.JSON format, and then passing the results back witih the function call to forEach.

func (*BigQueryReader) ProcessData

func (r *BigQueryReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*BigQueryReader) String

func (r *BigQueryReader) String() string

type BigQueryWriter

type BigQueryWriter struct {
	ConcurrencyLevel int // See ConcurrentDataProcessor
	// contains filtered or unexported fields
}

func NewBigQueryWriter

func NewBigQueryWriter(config *BigQueryConfig, tableName string) *BigQueryWriter

func NewBigQueryWriterForNewTable

func NewBigQueryWriterForNewTable(config *BigQueryConfig, tableName string, fields map[string]string) *BigQueryWriter

func (*BigQueryWriter) Concurrency

func (w *BigQueryWriter) Concurrency() int

See ConcurrentDataProcessor

func (*BigQueryWriter) Finish

func (w *BigQueryWriter) Finish(outputChan chan data.JSON, killChan chan error)

func (*BigQueryWriter) ProcessData

func (w *BigQueryWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*BigQueryWriter) String

func (w *BigQueryWriter) String() string

func (*BigQueryWriter) WriteBatch

func (w *BigQueryWriter) WriteBatch(queuedRows []map[string]interface{}) (err error)

type CSVTransformer

type CSVTransformer struct {
	Parameters util.CSVParameters
}

CSVTransformer converts data.JSON objects into a CSV string object and sends it on to the next stage. In use-cases where you simply want to write to a CSV file, use CSVWriter instead.

CSVTransformer is for more complex use-cases where you need to generate CSV data and perhaps send it to multiple output stages.

func NewCSVTransformer

func NewCSVTransformer() *CSVTransformer

NewCSVTransformer returns a new CSVTransformer wrapping the given io.Writer object

func (*CSVTransformer) Finish

func (w *CSVTransformer) Finish(outputChan chan data.JSON, killChan chan error)

func (*CSVTransformer) ProcessData

func (w *CSVTransformer) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*CSVTransformer) String

func (w *CSVTransformer) String() string

type CSVWriter

type CSVWriter struct {
	Parameters util.CSVParameters
}

CSVWriter is handles converting data.JSON objects into CSV format, and writing them to the given io.Writer. The Data must be a valid JSON object or a slice of valid JSON objects. If you already have Data formatted as a CSV string you can use an IoWriter instead.

func NewCSVWriter

func NewCSVWriter(w io.Writer) *CSVWriter

NewCSVWriter returns a new CSVWriter wrapping the given io.Writer object

func (*CSVWriter) Finish

func (w *CSVWriter) Finish(outputChan chan data.JSON, killChan chan error)

func (*CSVWriter) ProcessData

func (w *CSVWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*CSVWriter) String

func (w *CSVWriter) String() string

type FileReader

type FileReader struct {
	// contains filtered or unexported fields
}

FileReader opens and reads the contents of the given filename.

func NewFileReader

func NewFileReader(filename string) *FileReader

NewFileReader returns a new FileReader that will read the entire contents of the given file path and send it at once. For buffered or line-by-line reading try using IoReader.

func (*FileReader) Finish

func (r *FileReader) Finish(outputChan chan data.JSON, killChan chan error)

func (*FileReader) ProcessData

func (r *FileReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*FileReader) String

func (r *FileReader) String() string

type FtpWriter added in v1.5.0

type FtpWriter struct {
	// contains filtered or unexported fields
}

FtpWriter type represents an ftp writter processer

func NewFtpWriter added in v1.5.0

func NewFtpWriter(host, username, password, path string) *FtpWriter

NewFtpWriter instantiates new instance of an ftp writer

func (*FtpWriter) Finish added in v1.5.0

func (f *FtpWriter) Finish(outputChan chan data.JSON, killChan chan error)

Finish closes open references to the remote file and server

func (*FtpWriter) ProcessData added in v1.5.0

func (f *FtpWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

ProcessData writes data as is directly to the output file

func (*FtpWriter) String added in v1.5.0

func (f *FtpWriter) String() string

type FuncTransformer

type FuncTransformer struct {
	Name             string // can be set for more useful log output
	ConcurrencyLevel int    // See ConcurrentDataProcessor
	// contains filtered or unexported fields
}

FuncTransformer executes the given function on each data payload, sending the resuling data to the next stage.

While FuncTransformer is useful for simple data transformation, more complicated tasks justify building a custom implementation of DataProcessor.

func NewFuncTransformer

func NewFuncTransformer(transform func(d data.JSON) data.JSON) *FuncTransformer

func (*FuncTransformer) Concurrency

func (t *FuncTransformer) Concurrency() int

See ConcurrentSee ConcurrentDataProcessor

func (*FuncTransformer) Finish

func (t *FuncTransformer) Finish(outputChan chan data.JSON, killChan chan error)

func (*FuncTransformer) ProcessData

func (t *FuncTransformer) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*FuncTransformer) String

func (t *FuncTransformer) String() string

type HTTPRequest

type HTTPRequest struct {
	Request *http.Request
	Client  *http.Client
}

HTTPRequest executes an HTTP request and passes along the response body. It is simply wrapping an http.Request and http.Client object. See the net/http docs for more info: https://golang.org/pkg/net/http

func NewHTTPRequest

func NewHTTPRequest(method, url string, body io.Reader) (*HTTPRequest, error)

NewHTTPRequest creates a new HTTPRequest and is essentially wrapping net/http's NewRequest function. See https://golang.org/pkg/net/http/#NewRequest

func (*HTTPRequest) Finish

func (r *HTTPRequest) Finish(outputChan chan data.JSON, killChan chan error)

func (*HTTPRequest) ProcessData

func (r *HTTPRequest) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*HTTPRequest) String

func (r *HTTPRequest) String() string

type IoReader

type IoReader struct {
	Reader     io.Reader
	LineByLine bool // defaults to true
	BufferSize int
	Gzipped    bool
}

IoReader wraps an io.Reader and reads it.

func NewIoReader

func NewIoReader(reader io.Reader) *IoReader

NewIoReader returns a new IoReader wrapping the given io.Reader object.

func (*IoReader) Finish

func (r *IoReader) Finish(outputChan chan data.JSON, killChan chan error)

func (*IoReader) ForEachData

func (r *IoReader) ForEachData(killChan chan error, foo func(d data.JSON))

func (*IoReader) ProcessData

func (r *IoReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*IoReader) String

func (r *IoReader) String() string

type IoReaderWriter

type IoReaderWriter struct {
	IoReader
	IoWriter
}

IoReaderWriter performs both the job of a IoReader and IoWriter. It will read data from the given io.Reader, write the resulting data to the given io.Writer, and (if the write was successful) send the data to the next stage of processing.

IoReaderWriter is composed of both a IoReader and IoWriter, so it supports all of the same properties and usage options.

func NewIoReaderWriter

func NewIoReaderWriter(reader io.Reader, writer io.Writer) *IoReaderWriter

NewIoReaderWriter returns a new IoReaderWriter wrapping the given io.Reader object

func (*IoReaderWriter) Finish

func (r *IoReaderWriter) Finish(outputChan chan data.JSON, killChan chan error)

func (*IoReaderWriter) ProcessData

func (r *IoReaderWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*IoReaderWriter) String

func (r *IoReaderWriter) String() string

type IoWriter

type IoWriter struct {
	Writer     io.Writer
	AddNewline bool
}

IoWriter wraps any io.Writer object. It can be used to write data out to a File, os.Stdout, or any other task that can be supported via io.Writer.

func NewIoWriter

func NewIoWriter(writer io.Writer) *IoWriter

NewIoWriter returns a new IoWriter wrapping the given io.Writer object

func (*IoWriter) Finish

func (w *IoWriter) Finish(outputChan chan data.JSON, killChan chan error)

func (*IoWriter) ProcessData

func (w *IoWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*IoWriter) String

func (w *IoWriter) String() string

type Passthrough

type Passthrough struct {
	// contains filtered or unexported fields
}

Passthrough simply passes the data on to the next stage.

func NewPassthrough

func NewPassthrough() *Passthrough

func (*Passthrough) Finish

func (r *Passthrough) Finish(outputChan chan data.JSON, killChan chan error)

func (*Passthrough) ProcessData

func (r *Passthrough) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*Passthrough) String

func (r *Passthrough) String() string

type RegexpMatcher

type RegexpMatcher struct {

	// Set to true to log each match attempt (logger must be in debug mode).
	DebugLog bool
	// contains filtered or unexported fields
}

RegexpMatcher checks if incoming data matches the given Regexp, and sends it on to the next stage only if it matches. It is using regexp.Match under the covers: https://golang.org/pkg/regexp/#Match

func NewRegexpMatcher

func NewRegexpMatcher(pattern string) *RegexpMatcher

NewRegexpMatcher returns a new RegexpMatcher initialized with the given pattern to match.

func (*RegexpMatcher) Finish

func (r *RegexpMatcher) Finish(outputChan chan data.JSON, killChan chan error)

func (*RegexpMatcher) ProcessData

func (r *RegexpMatcher) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*RegexpMatcher) String

func (r *RegexpMatcher) String() string

type S3Reader

type S3Reader struct {
	IoReader // embeds IoReader

	DeleteObjects bool
	// contains filtered or unexported fields
}

S3Reader handles retrieving objects from S3. Use NewS3ObjectReader to read a single object, or NewS3PrefixReader to read all objects matching the same prefix in your bucket. S3Reader embeds an IoReeader, so it will support the same configuration options as IoReader.

func NewS3ObjectReader

func NewS3ObjectReader(awsID, awsSecret, awsRegion, bucket, object string) *S3Reader

NewS3ObjectReader reads a single object from the given S3 bucket

func NewS3PrefixReader

func NewS3PrefixReader(awsID, awsSecret, awsRegion, bucket, prefix string) *S3Reader

NewS3PrefixReader reads a all objects from the given S3 bucket that match a prefix. See http://docs.aws.amazon.com/AmazonS3/latest/dev/ListingKeysHierarchy.html S3 Delimiter will be "/"

func (*S3Reader) Finish

func (r *S3Reader) Finish(outputChan chan data.JSON, killChan chan error)

func (*S3Reader) ProcessData

func (r *S3Reader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*S3Reader) String

func (r *S3Reader) String() string

type S3Writer

type S3Writer struct {
	Compress      bool
	LineSeparator string
	// contains filtered or unexported fields
}

By default, we will not compress data before sending it upstream to S3. Set the `Compress` flag to true to use gzip compression before storing in S3 (if this flag is set to true, ".gz" will automatically be appended to the key name specified).

By default, we will separate each iteration of data sent to `ProcessData` with a new line when we piece back together to send to S3. Change the `LineSeparator` attribute to change this behavior.

func NewS3Writer

func NewS3Writer(awsID, awsSecret, awsRegion, bucket, key string) *S3Writer

func (*S3Writer) Finish

func (w *S3Writer) Finish(outputChan chan data.JSON, killChan chan error)

func (*S3Writer) ProcessData

func (w *S3Writer) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*S3Writer) String

func (w *S3Writer) String() string

type SCP

type SCP struct {
	Port        string
	Object      string
	Destination string
}

SCP executes the scp command, sending the given file to the given destination.

func NewSCP

func NewSCP(obj string, destination string) *SCP

func (*SCP) Finish

func (s *SCP) Finish(outputChan chan data.JSON, killChan chan error)

func (*SCP) ProcessData

func (s *SCP) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*SCP) Run

func (s *SCP) Run(killChan chan error)

type SQLExecutor

type SQLExecutor struct {
	// contains filtered or unexported fields
}

func NewDynamicSQLExecutor

func NewDynamicSQLExecutor(dbConn *sql.DB, sqlGenerator func(data.JSON) (string, error)) *SQLExecutor

NewDynamicSQLExecutor returns a new SQLExecutor operating in dynamic mode.

func NewSQLExecutor

func NewSQLExecutor(dbConn *sql.DB, sql string) *SQLExecutor

NewSQLExecutor returns a new SQLExecutor

func (*SQLExecutor) Finish

func (s *SQLExecutor) Finish(outputChan chan data.JSON, killChan chan error)

Finish - see interface for documentation.

func (*SQLExecutor) ProcessData

func (s *SQLExecutor) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*SQLExecutor) String

func (s *SQLExecutor) String() string

type SQLReader

type SQLReader struct {
	BatchSize         int
	StructDestination interface{}
	ConcurrencyLevel  int // See ConcurrentDataProcessor
	// contains filtered or unexported fields
}

SQLReader runs the given SQL and passes the resulting data to the next stage of processing.

It can operate in 2 modes: 1) Static - runs the given SQL query and ignores any received data. 2) Dynamic - generates a SQL query for each data payload it receives.

The dynamic SQL generation is implemented by passing in a "sqlGenerator" function to NewDynamicSQLReader. This allows you to write whatever code is needed to generate SQL based upon data flowing through the pipeline.

func NewDynamicSQLReader

func NewDynamicSQLReader(dbConn *sql.DB, sqlGenerator func(data.JSON) (string, error)) *SQLReader

NewDynamicSQLReader returns a new SQLReader operating in dynamic mode.

func NewSQLReader

func NewSQLReader(dbConn *sql.DB, sql string) *SQLReader

NewSQLReader returns a new SQLReader operating in static mode.

func (*SQLReader) Concurrency

func (s *SQLReader) Concurrency() int

See ConcurrentDataProcessor

func (*SQLReader) Finish

func (s *SQLReader) Finish(outputChan chan data.JSON, killChan chan error)

Finish - see interface for documentation.

func (*SQLReader) ForEachQueryData

func (s *SQLReader) ForEachQueryData(d data.JSON, killChan chan error, forEach func(d data.JSON))

ForEachQueryData handles generating the SQL (in case of dynamic mode), running the query and retrieving the data in data.JSON format, and then passing the results back witih the function call to forEach.

func (*SQLReader) ProcessData

func (s *SQLReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

ProcessData - see interface for documentation.

func (*SQLReader) String

func (s *SQLReader) String() string

type SQLReaderWriter

type SQLReaderWriter struct {
	SQLReader
	SQLWriter
	ConcurrencyLevel int // See ConcurrentDataProcessor
}

SQLReaderWriter performs both the job of a SQLReader and SQLWriter. This means it will run a SQL query, write the resulting data into a SQL database, and (if the write was successful) send the queried data to the next stage of processing.

SQLReaderWriter is composed of both a SQLReader and SQLWriter, so it supports all of the same properties and usage options (such as static versus dynamic SQL querying).

func NewDynamicSQLReaderWriter

func NewDynamicSQLReaderWriter(readConn *sql.DB, writeConn *sql.DB, sqlGenerator func(data.JSON) (string, error), writeTable string) *SQLReaderWriter

NewDynamicSQLReaderWriter returns a new SQLReaderWriter ready for dynamic querying.

func NewSQLReaderWriter

func NewSQLReaderWriter(readConn *sql.DB, writeConn *sql.DB, readQuery, writeTable string) *SQLReaderWriter

NewSQLReaderWriter returns a new SQLReaderWriter ready for static querying.

func (*SQLReaderWriter) Concurrency

func (s *SQLReaderWriter) Concurrency() int

See ConcurrentDataProcessor

func (*SQLReaderWriter) Finish

func (s *SQLReaderWriter) Finish(outputChan chan data.JSON, killChan chan error)

func (*SQLReaderWriter) ProcessData

func (s *SQLReaderWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*SQLReaderWriter) String

func (s *SQLReaderWriter) String() string

type SQLWriter

type SQLWriter struct {
	TableName        string
	OnDupKeyUpdate   bool
	OnDupKeyFields   []string
	ConcurrencyLevel int // See ConcurrentDataProcessor
	BatchSize        int
	// contains filtered or unexported fields
}

SQLWriter handles INSERTing data.JSON into a specified SQL table. If an error occurs while building or executing the INSERT, the error will be sent to the killChan.

Note that the data.JSON must be a valid JSON object or a slice of valid objects, where the keys are column names and the the values are the SQL values to be inserted into those columns.

For use-cases where a SQLWriter instance needs to write to multiple tables you can pass in SQLWriterData.

func NewSQLWriter

func NewSQLWriter(db *sql.DB, tableName string) *SQLWriter

NewSQLWriter returns a new SQLWriter

func (*SQLWriter) Concurrency

func (s *SQLWriter) Concurrency() int

See ConcurrentDataProcessor

func (*SQLWriter) Finish

func (s *SQLWriter) Finish(outputChan chan data.JSON, killChan chan error)

func (*SQLWriter) ProcessData

func (s *SQLWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*SQLWriter) String

func (s *SQLWriter) String() string

type SQLWriterData

type SQLWriterData struct {
	TableName  string      `json:"table_name"`
	InsertData interface{} `json:"insert_data"`
}

SQLWriterData is a custom data structure you can send into a SQLWriter stage if you need to specify TableName on a per-data payload basis. No extra configuration is needed to use SQLWriterData, each data payload received is first checked for this structure before processing.

type SftpReader

type SftpReader struct {
	IoReader // embeds IoReader

	DeleteObjects bool
	Walk          bool
	FileNamesOnly bool

	CloseOnFinish bool
	// contains filtered or unexported fields
}

SftpReader reads a single object at a given path, or walks through the directory specified by the path (SftpReader.Walk must be set to true).

To only send full paths (and not file contents), set FileNamesOnly to true. If FileNamesOnly is set to true, DeleteObjects will be ignored.

func NewSftpReader

func NewSftpReader(server string, username string, path string, authMethods ...ssh.AuthMethod) *SftpReader

NewSftpReader instantiates a new sftp reader, a connection to the remote server is delayed until data is recv'd by the reader By default, the connection to the remote client will be closed in the Finish() func. Set CloseOnFinish to false to manage the connection manually.

func NewSftpReaderByClient

func NewSftpReaderByClient(client *sftp.Client, path string) *SftpReader

NewSftpReaderByClient instantiates a new sftp reader using an existing connection to the remote server. By default, the connection to the remote client will *not* be closed in the Finish() func. Set CloseOnFinish to true to have this processor clean up the connection when it's done.

func (*SftpReader) CloseClient

func (r *SftpReader) CloseClient()

As the remote client itself is not exposed, you can manually close its connection through this func

func (*SftpReader) Finish

func (r *SftpReader) Finish(outputChan chan data.JSON, killChan chan error)

Finish optionally closes open references to the remote server

func (*SftpReader) ProcessData

func (r *SftpReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

func (*SftpReader) String

func (r *SftpReader) String() string

type SftpWriter

type SftpWriter struct {
	CloseOnFinish bool
	// contains filtered or unexported fields
}

SftpWriter is an inline writer to remote sftp server

func NewSftpWriter

func NewSftpWriter(server string, username string, path string, authMethods ...ssh.AuthMethod) *SftpWriter

NewSftpWriter instantiates a new sftp writer, a connection to the remote server is delayed until data is recv'd by the writer By default, the connection to the remote client will be closed in the Finish() func. Set CloseOnFinish to false to manage the connection manually.

func NewSftpWriterByFile

func NewSftpWriterByFile(file *sftp.File) *SftpWriter

NewSftpWriterByFile allows you to manually manage the connection to the remote file object. Use this if you want to write to the same file object across multiple pipelines. By default, the connection to the remote client will *not* be closed in the Finish() func. Set CloseOnFinish to true to have this processor clean up the connection when it's done.

func (*SftpWriter) Finish

func (w *SftpWriter) Finish(outputChan chan data.JSON, killChan chan error)

Finish optionally closes open references to the remote file and server

func (*SftpWriter) ProcessData

func (w *SftpWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)

ProcessData writes data as is directly to the output file

func (*SftpWriter) String

func (f *SftpWriter) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL