Documentation
¶
Overview ¶
Package processors contains built-in DataProcessor implementations that are generic and potentially useful across any ETL project.
Index ¶
- type BigQueryConfig
- type BigQueryReader
- func (r *BigQueryReader) Concurrency() int
- func (r *BigQueryReader) Finish(outputChan chan data.JSON, killChan chan error)
- func (r *BigQueryReader) ForEachQueryData(d data.JSON, killChan chan error, forEach func(d data.JSON))
- func (r *BigQueryReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)
- func (r *BigQueryReader) String() string
- type BigQueryWriter
- func (w *BigQueryWriter) Concurrency() int
- func (w *BigQueryWriter) Finish(outputChan chan data.JSON, killChan chan error)
- func (w *BigQueryWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)
- func (w *BigQueryWriter) String() string
- func (w *BigQueryWriter) WriteBatch(queuedRows []map[string]interface{}) (err error)
- type CSVTransformer
- type CSVWriter
- type FileReader
- type FtpWriter
- type FuncTransformer
- type HTTPRequest
- type IoReader
- type IoReaderWriter
- type IoWriter
- type MySQLWriter
- type Passthrough
- type PostgreSQLDeleterWriter
- type PostgreSQLWriter
- type RegexpMatcher
- type SCP
- type SQLExecutor
- type SQLPassthroughExecutor
- type SQLReader
- func (s *SQLReader) Concurrency() int
- func (s *SQLReader) Finish(outputChan chan data.JSON, killChan chan error)
- func (s *SQLReader) ForEachQueryData(d data.JSON, killChan chan error, forEach func(d data.JSON))
- func (s *SQLReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)
- func (s *SQLReader) String() string
- type SQLReaderMySQLWriter
- type SQLReaderPostgreSQLWriter
- func (s *SQLReaderPostgreSQLWriter) Concurrency() int
- func (s *SQLReaderPostgreSQLWriter) Finish(outputChan chan data.JSON, killChan chan error)
- func (s *SQLReaderPostgreSQLWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)
- func (s *SQLReaderPostgreSQLWriter) String() string
- type SQLWriterData
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BigQueryConfig ¶
BigQueryConfig is used when init'ing new BigQueryReader instances.
type BigQueryReader ¶
type BigQueryReader struct { PageSize int // defaults to 5000 AggregateResults bool // determines whether to send data as soon as available or to aggregate and send all query results, defaults to false UnflattenResults bool // defaults to false TmpTableName string // Used when UnflattenResults is true. default to "_ratchet_tmp" ConcurrencyLevel int // See ConcurrentDataProcessor // contains filtered or unexported fields }
BigQueryReader is used to query data from Google's BigQuery, and it behaves similarly to SQLReader. See SQLReader docs for explanation on static vs dynamic querying.
Note: If your data set contains nested/repeated fields you will likely want to get results back "unflattened." By default BigQuery returns results in a flattened format, which duplicates rows for each repeated value. This can be annoying to deal with, so BigQueryReader provides a "UnflattenResults" flag that will handle querying in such a way to get back unflattened results. This involves using a temporary table setting and a couple of other special query settings - read the BigQuery docs related to flatten and repeated fields for more info.
func NewBigQueryReader ¶
func NewBigQueryReader(config *BigQueryConfig, query string) *BigQueryReader
NewBigQueryReader returns an instance of a BigQueryExtractor ready to run a static query.
func NewDynamicBigQueryReader ¶
func NewDynamicBigQueryReader(config *BigQueryConfig, sqlGenerator func(data.JSON) (string, error)) *BigQueryReader
NewDynamicBigQueryReader returns an instance of a BigQueryExtractor ready to run a dynamic query based on the sqlGenerator function.
func (*BigQueryReader) Concurrency ¶
func (r *BigQueryReader) Concurrency() int
Concurrency defers to ConcurrentDataProcessor
func (*BigQueryReader) Finish ¶
func (r *BigQueryReader) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*BigQueryReader) ForEachQueryData ¶
func (r *BigQueryReader) ForEachQueryData(d data.JSON, killChan chan error, forEach func(d data.JSON))
ForEachQueryData handles generating the SQL (in case of dynamic mode), running the query and retrieving the data in data.JSON format, and then passing the results back witih the function call to forEach.
func (*BigQueryReader) ProcessData ¶
ProcessData defers to ForEachQueryData
func (*BigQueryReader) String ¶
func (r *BigQueryReader) String() string
type BigQueryWriter ¶
type BigQueryWriter struct { ConcurrencyLevel int // See ConcurrentDataProcessor // contains filtered or unexported fields }
BigQueryWriter is used to write data to Google's BigQuery. If the table you want to write to already exists, use NewBigQueryWriter, otherwise use NewBigQueryWriterForNewTable and the desired table structure will be created when the client is initiated.
func NewBigQueryWriter ¶
func NewBigQueryWriter(config *BigQueryConfig, tableName string) *BigQueryWriter
NewBigQueryWriter instantiates a new instance of BigQueryWriter
func NewBigQueryWriterForNewTable ¶
func NewBigQueryWriterForNewTable(config *BigQueryConfig, tableName string, fields map[string]string) *BigQueryWriter
NewBigQueryWriterForNewTable instantiates a new instance of BigQueryWriter and prepares to write results to a new table
func (*BigQueryWriter) Concurrency ¶
func (w *BigQueryWriter) Concurrency() int
Concurrency delegates to ConcurrentDataProcessor
func (*BigQueryWriter) Finish ¶
func (w *BigQueryWriter) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*BigQueryWriter) ProcessData ¶
ProcessData defers to WriterBatch
func (*BigQueryWriter) String ¶
func (w *BigQueryWriter) String() string
func (*BigQueryWriter) WriteBatch ¶
func (w *BigQueryWriter) WriteBatch(queuedRows []map[string]interface{}) (err error)
WriteBatch inserts the supplied data into BigQuery
type CSVTransformer ¶
type CSVTransformer struct {
Parameters util.CSVParameters
}
CSVTransformer converts data.JSON objects into a CSV string object and sends it on to the next stage. In use-cases where you simply want to write to a CSV file, use CSVWriter instead.
CSVTransformer is for more complex use-cases where you need to generate CSV data and perhaps send it to multiple output stages.
func NewCSVTransformer ¶
func NewCSVTransformer() *CSVTransformer
NewCSVTransformer returns a new CSVTransformer wrapping the given io.Writer object
func (*CSVTransformer) Finish ¶
func (w *CSVTransformer) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*CSVTransformer) ProcessData ¶
ProcessData defers to util.CSVProcess
func (*CSVTransformer) String ¶
func (w *CSVTransformer) String() string
type CSVWriter ¶
type CSVWriter struct {
Parameters util.CSVParameters
}
CSVWriter is handles converting data.JSON objects into CSV format, and writing them to the given io.Writer. The Data must be a valid JSON object or a slice of valid JSON objects. If you already have Data formatted as a CSV string you can use an IoWriter instead.
func NewCSVWriter ¶
NewCSVWriter returns a new CSVWriter wrapping the given io.Writer object
func (*CSVWriter) ProcessData ¶
ProcessData defers to util.CSVProcess
type FileReader ¶
type FileReader struct {
// contains filtered or unexported fields
}
FileReader opens and reads the contents of the given filename.
func NewFileReader ¶
func NewFileReader(filename string) *FileReader
NewFileReader returns a new FileReader that will read the entire contents of the given file path and send it at once. For buffered or line-by-line reading try using IoReader.
func (*FileReader) Finish ¶
func (r *FileReader) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*FileReader) ProcessData ¶
ProcessData reads a file and sends its contents to outputChan
func (*FileReader) String ¶
func (r *FileReader) String() string
type FtpWriter ¶
type FtpWriter struct {
// contains filtered or unexported fields
}
FtpWriter type represents an ftp writter processor
func NewFtpWriter ¶
NewFtpWriter instantiates new instance of an ftp writer
func (*FtpWriter) ProcessData ¶
ProcessData writes data as is directly to the output file
type FuncTransformer ¶
type FuncTransformer struct { Name string // can be set for more useful log output ConcurrencyLevel int // See ConcurrentDataProcessor // contains filtered or unexported fields }
FuncTransformer executes the given function on each data payload, sending the resuling data to the next stage.
While FuncTransformer is useful for simple data transformation, more complicated tasks justify building a custom implementation of DataProcessor.
func NewFuncTransformer ¶
func NewFuncTransformer(transform func(d data.JSON) data.JSON) *FuncTransformer
NewFuncTransformer instantiates a new instance of func transformer
func (*FuncTransformer) Concurrency ¶
func (t *FuncTransformer) Concurrency() int
Concurrency defers to ConcurrentDataProcessor
func (*FuncTransformer) Finish ¶
func (t *FuncTransformer) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*FuncTransformer) ProcessData ¶
ProcessData runs the supplied func and sends the returned value to outputChan
func (*FuncTransformer) String ¶
func (t *FuncTransformer) String() string
type HTTPRequest ¶
HTTPRequest executes an HTTP request and passes along the response body. It is simply wrapping an http.Request and http.Client object. See the net/http docs for more info: https://golang.org/pkg/net/http
func NewHTTPRequest ¶
func NewHTTPRequest(method, url string, body io.Reader) (*HTTPRequest, error)
NewHTTPRequest creates a new HTTPRequest and is essentially wrapping net/http's NewRequest function. See https://golang.org/pkg/net/http/#NewRequest
func (*HTTPRequest) Finish ¶
func (r *HTTPRequest) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*HTTPRequest) ProcessData ¶
ProcessData sends data to outputChan if the response body is not null
func (*HTTPRequest) String ¶
func (r *HTTPRequest) String() string
type IoReader ¶
type IoReader struct { Reader io.Reader LineByLine bool // defaults to true BufferSize int Gzipped bool }
IoReader wraps an io.Reader and reads it.
func NewIoReader ¶
NewIoReader returns a new IoReader wrapping the given io.Reader object.
func (*IoReader) ForEachData ¶
ForEachData either reads by line or by buffered stream, sending the data back to the anonymous func that ultimately shoves it onto the outputChan
func (*IoReader) ProcessData ¶
ProcessData overwrites the reader if the content is Gzipped, then defers to ForEachData
type IoReaderWriter ¶
IoReaderWriter performs both the job of a IoReader and IoWriter. It will read data from the given io.Reader, write the resulting data to the given io.Writer, and (if the write was successful) send the data to the next stage of processing.
IoReaderWriter is composed of both a IoReader and IoWriter, so it supports all of the same properties and usage options.
func NewIoReaderWriter ¶
func NewIoReaderWriter(reader io.Reader, writer io.Writer) *IoReaderWriter
NewIoReaderWriter returns a new IoReaderWriter wrapping the given io.Reader object
func (*IoReaderWriter) Finish ¶
func (r *IoReaderWriter) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*IoReaderWriter) ProcessData ¶
ProcessData grabs data from IoReader.ForEachData, then sends it to IoWriter.ProcessData in addition to sending it upstream on the outputChan
func (*IoReaderWriter) String ¶
func (r *IoReaderWriter) String() string
type IoWriter ¶
IoWriter wraps any io.Writer object. It can be used to write data out to a File, os.Stdout, or any other task that can be supported via io.Writer.
func NewIoWriter ¶
NewIoWriter returns a new IoWriter wrapping the given io.Writer object
func (*IoWriter) ProcessData ¶
ProcessData writes the data
type MySQLWriter ¶
type MySQLWriter struct { TableName string OnDupKeyUpdate bool OnDupKeyFields []string ConcurrencyLevel int // See ConcurrentDataProcessor BatchSize int // contains filtered or unexported fields }
MySQLWriter handles INSERTing data.JSON into a specified SQL table. If an error occurs while building or executing the INSERT, the error will be sent to the killChan.
Note that the data.JSON must be a valid JSON object or a slice of valid objects, where the keys are column names and the the values are the SQL values to be inserted into those columns.
For use-cases where a MySQLWriter instance needs to write to multiple tables you can pass in SQLWriterData.
func NewMySQLWriter ¶
func NewMySQLWriter(db *sql.DB, tableName string) *MySQLWriter
NewMySQLWriter returns a new MySQLWriter
func (*MySQLWriter) Concurrency ¶
func (s *MySQLWriter) Concurrency() int
Concurrency defers to ConcurrentDataProcessor
func (*MySQLWriter) Finish ¶
func (s *MySQLWriter) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*MySQLWriter) ProcessData ¶
ProcessData defers to util.MySQLInsertData
func (*MySQLWriter) String ¶
func (s *MySQLWriter) String() string
type Passthrough ¶
type Passthrough struct {
// contains filtered or unexported fields
}
Passthrough simply passes the data on to the next stage. We have to set a placeholder field - if we leave this as an empty struct we get some properties for comparison and memory addressing that are not desirable and cause comparison bugs (see: http://dave.cheney.net/2014/03/25/the-empty-struct)
func NewPassthrough ¶
func NewPassthrough() *Passthrough
NewPassthrough instantiates a new instance of Passthrough
func (*Passthrough) Finish ¶
func (r *Passthrough) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*Passthrough) ProcessData ¶
ProcessData blindly sends whatever it receives to the outputChan
func (*Passthrough) String ¶
func (r *Passthrough) String() string
type PostgreSQLDeleterWriter ¶
type PostgreSQLDeleterWriter struct { TableName string OnDupKeyUpdate bool OnDupKeyIndex string // The conflict target: see https://www.postgresql.org/docs/9.5/static/sql-insert.html OnDupKeyFields []string ConcurrencyLevel int // See ConcurrentDataProcessor BatchSize int // contains filtered or unexported fields }
PostgreSQLDeleterWriter handles DELETING data and then INSERTing data.JSON into a specified SQL table. If an error occurs while deleting or executing the INSERT, the error will be sent to the killChan.
deleteSql must be a string to just be executed.
Note that the data.JSON must be a valid JSON object or a slice of valid objects, where the keys are column names and the the values are the SQL values to be inserted into those columns.
For use-cases where a PostgreSQLDeleterWriter instance needs to write to multiple tables you can pass in SQLWriterData.
Note that if `OnDupKeyUpdate` is true (the default), you *must* provide a value for `OnDupKeyIndex` (which is the PostgreSQL conflict target).
func NewPostgreSQLDeleterWriter ¶
func NewPostgreSQLDeleterWriter(db *sql.DB, tableName string, deleteSql string) *PostgreSQLDeleterWriter
NewPostgreSQLDeleterWriter returns a new PostgreSQLDeleterWriter
func (*PostgreSQLDeleterWriter) Concurrency ¶
func (s *PostgreSQLDeleterWriter) Concurrency() int
Concurrency defers to ConcurrentDataProcessor
func (*PostgreSQLDeleterWriter) Finish ¶
func (s *PostgreSQLDeleterWriter) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*PostgreSQLDeleterWriter) ProcessData ¶
func (s *PostgreSQLDeleterWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)
ProcessData defers to util.PostgreSQLInsertData
func (*PostgreSQLDeleterWriter) String ¶
func (s *PostgreSQLDeleterWriter) String() string
type PostgreSQLWriter ¶
type PostgreSQLWriter struct { TableName string OnDupKeyUpdate bool OnDupKeyIndex string // The conflict target: see https://www.postgresql.org/docs/9.5/static/sql-insert.html OnDupKeyFields []string ConcurrencyLevel int // See ConcurrentDataProcessor BatchSize int // contains filtered or unexported fields }
PostgreSQLWriter handles INSERTing data.JSON into a specified SQL table. If an error occurs while building or executing the INSERT, the error will be sent to the killChan.
Note that the data.JSON must be a valid JSON object or a slice of valid objects, where the keys are column names and the the values are the SQL values to be inserted into those columns.
For use-cases where a PostgreSQLWriter instance needs to write to multiple tables you can pass in SQLWriterData.
Note that if `OnDupKeyUpdate` is true (the default), you *must* provide a value for `OnDupKeyIndex` (which is the PostgreSQL conflict target).
func NewPostgreSQLWriter ¶
func NewPostgreSQLWriter(db *sql.DB, tableName string) *PostgreSQLWriter
NewPostgreSQLWriter returns a new PostgreSQLWriter
func (*PostgreSQLWriter) Concurrency ¶
func (s *PostgreSQLWriter) Concurrency() int
Concurrency defers to ConcurrentDataProcessor
func (*PostgreSQLWriter) Finish ¶
func (s *PostgreSQLWriter) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*PostgreSQLWriter) ProcessData ¶
ProcessData defers to util.PostgreSQLInsertData
func (*PostgreSQLWriter) String ¶
func (s *PostgreSQLWriter) String() string
type RegexpMatcher ¶
type RegexpMatcher struct { // Set to true to log each match attempt (logger must be in debug mode). DebugLog bool // contains filtered or unexported fields }
RegexpMatcher checks if incoming data matches the given Regexp, and sends it on to the next stage only if it matches. It is using regexp.Match under the covers: https://golang.org/pkg/regexp/#Match
func NewRegexpMatcher ¶
func NewRegexpMatcher(pattern string) *RegexpMatcher
NewRegexpMatcher returns a new RegexpMatcher initialized with the given pattern to match.
func (*RegexpMatcher) Finish ¶
func (r *RegexpMatcher) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*RegexpMatcher) ProcessData ¶
ProcessData sends the data it receives to the outputChan only if it matches the supplied regex
func (*RegexpMatcher) String ¶
func (r *RegexpMatcher) String() string
type SCP ¶
type SCP struct { Port string // e.g., "2222" -- only send for non-standard ports Object string // e.g., "/path/to/file.txt" Destination string // e.g., "user@host:/path/to/destination/" }
SCP executes the scp command, sending the given file to the given destination.
func (*SCP) ProcessData ¶
ProcessData sends all data to outputChan
type SQLExecutor ¶
type SQLExecutor struct {
// contains filtered or unexported fields
}
SQLExecutor runs the given SQL and swallows any returned data.
It can operate in 2 modes: 1) Static - runs the given SQL query and ignores any received data. 2) Dynamic - generates a SQL query for each data payload it receives.
The dynamic SQL generation is implemented by passing in a "sqlGenerator" function to NewDynamicSQLExecutor. This allows you to write whatever code is needed to generate SQL based upon data flowing through the pipeline.
func NewDynamicSQLExecutor ¶
func NewDynamicSQLExecutor(dbConn *sql.DB, sqlGenerator func(data.JSON) (string, error)) *SQLExecutor
NewDynamicSQLExecutor returns a new SQLExecutor operating in dynamic mode.
func NewSQLExecutor ¶
func NewSQLExecutor(dbConn *sql.DB, sql string) *SQLExecutor
NewSQLExecutor returns a new SQLExecutor
func (*SQLExecutor) Finish ¶
func (s *SQLExecutor) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*SQLExecutor) ProcessData ¶
ProcessData runs the SQL statements, deferring to util.ExecuteSQLQuery
func (*SQLExecutor) String ¶
func (s *SQLExecutor) String() string
type SQLPassthroughExecutor ¶
type SQLPassthroughExecutor struct {
// contains filtered or unexported fields
}
SQLPassthroughExecutor runs the given SQL AND passes original data through to the next stage.
It can operate in 1 mode: 1) Static - runs the given SQL query and ignores any received data.
func NewSQLPassthroughExecutor ¶
func NewSQLPassthroughExecutor(dbConn *sql.DB, sql string, timesMaxRun int) *SQLPassthroughExecutor
NewSQLPassthroughExecutor returns a new SQLPassthroughExecutor
func (*SQLPassthroughExecutor) Finish ¶
func (s *SQLPassthroughExecutor) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*SQLPassthroughExecutor) ProcessData ¶
func (s *SQLPassthroughExecutor) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)
ProcessData runs the SQL statements, deferring to util.ExecuteSQLQuery
func (*SQLPassthroughExecutor) String ¶
func (s *SQLPassthroughExecutor) String() string
type SQLReader ¶
type SQLReader struct { BatchSize int StructDestination interface{} ConcurrencyLevel int // See ConcurrentDataProcessor // contains filtered or unexported fields }
SQLReader runs the given SQL and passes the resulting data to the next stage of processing.
It can operate in 2 modes: 1) Static - runs the given SQL query and ignores any received data. 2) Dynamic - generates a SQL query for each data payload it receives.
The dynamic SQL generation is implemented by passing in a "sqlGenerator" function to NewDynamicSQLReader. This allows you to write whatever code is needed to generate SQL based upon data flowing through the pipeline.
func NewDynamicSQLReader ¶
NewDynamicSQLReader returns a new SQLReader operating in dynamic mode.
func NewSQLReader ¶
NewSQLReader returns a new SQLReader operating in static mode.
func (*SQLReader) Concurrency ¶
Concurrency defers to ConcurrentDataProcessor
func (*SQLReader) ForEachQueryData ¶
ForEachQueryData handles generating the SQL (in case of dynamic mode), running the query and retrieving the data in data.JSON format, and then passing the results back witih the function call to forEach.
func (*SQLReader) ProcessData ¶
ProcessData - see interface for documentation.
type SQLReaderMySQLWriter ¶
type SQLReaderMySQLWriter struct { SQLReader MySQLWriter ConcurrencyLevel int // See ConcurrentDataProcessor }
SQLReaderMySQLWriter performs both the job of a SQLReader and MySQLWriter. This means it will run a SQL query, write the resulting data into a MySQL database, and (if the write was successful) send the queried data to the next stage of processing.
SQLReaderMySQLWriter is composed of both a SQLReader and MySQLWriter, so it supports all of the same properties and usage options (such as static versus dynamic SQL querying).
func NewDynamicSQLReaderMySQLWriter ¶
func NewDynamicSQLReaderMySQLWriter(readConn *sql.DB, writeConn *sql.DB, sqlGenerator func(data.JSON) (string, error), writeTable string) *SQLReaderMySQLWriter
NewDynamicSQLReaderMySQLWriter returns a new SQLReaderMySQLWriter ready for dynamic querying.
func NewSQLReaderMySQLWriter ¶
func NewSQLReaderMySQLWriter(readConn *sql.DB, writeConn *sql.DB, readQuery, writeTable string) *SQLReaderMySQLWriter
NewSQLReaderMySQLWriter returns a new SQLReaderMySQLWriter ready for static querying.
func (*SQLReaderMySQLWriter) Concurrency ¶
func (s *SQLReaderMySQLWriter) Concurrency() int
Concurrency defers to ConcurrentDataProcessor
func (*SQLReaderMySQLWriter) Finish ¶
func (s *SQLReaderMySQLWriter) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*SQLReaderMySQLWriter) ProcessData ¶
func (s *SQLReaderMySQLWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)
ProcessData uses SQLReader methods for processing data - this works via composition
func (*SQLReaderMySQLWriter) String ¶
func (s *SQLReaderMySQLWriter) String() string
type SQLReaderPostgreSQLWriter ¶
type SQLReaderPostgreSQLWriter struct { SQLReader PostgreSQLWriter ConcurrencyLevel int // See ConcurrentDataProcessor }
SQLReaderPostgreSQLWriter performs both the job of a SQLReader and PostgreSQLWriter. This means it will run a SQL query, write the resulting data into a PostgreSQL database, and (if the write was successful) send the queried data to the next stage of processing.
SQLReaderPostgreSQLWriter is composed of both a SQLReader and PostgreSQLWriter, so it supports all of the same properties and usage options (such as static versus dynamic SQL querying).
func NewDynamicSQLReaderPostgreSQLWriter ¶
func NewDynamicSQLReaderPostgreSQLWriter(readConn *sql.DB, writeConn *sql.DB, sqlGenerator func(data.JSON) (string, error), writeTable string) *SQLReaderPostgreSQLWriter
NewDynamicSQLReaderPostgreSQLWriter returns a new SQLReaderPostgreSQLWriter ready for dynamic querying.
func NewSQLReaderPostgreSQLWriter ¶
func NewSQLReaderPostgreSQLWriter(readConn *sql.DB, writeConn *sql.DB, readQuery, writeTable string) *SQLReaderPostgreSQLWriter
NewSQLReaderPostgreSQLWriter returns a new SQLReaderPostgreSQLWriter ready for static querying.
func (*SQLReaderPostgreSQLWriter) Concurrency ¶
func (s *SQLReaderPostgreSQLWriter) Concurrency() int
Concurrency defers to ConcurrentDataProcessor
func (*SQLReaderPostgreSQLWriter) Finish ¶
func (s *SQLReaderPostgreSQLWriter) Finish(outputChan chan data.JSON, killChan chan error)
Finish - see interface for documentation.
func (*SQLReaderPostgreSQLWriter) ProcessData ¶
func (s *SQLReaderPostgreSQLWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error)
ProcessData uses SQLReader methods for processing data - this works via composition
func (*SQLReaderPostgreSQLWriter) String ¶
func (s *SQLReaderPostgreSQLWriter) String() string
type SQLWriterData ¶
type SQLWriterData struct { TableName string `json:"table_name"` InsertData interface{} `json:"insert_data"` }
SQLWriterData is a custom data structure you can send into a MySQLWriter stage or a PostreSQLWriter stage if you need to specify TableName on a per-data payload basis. No extra configuration is needed to use SQLWriterData, each data payload received is first checked for this structure before processing.
Source Files
¶
- big_query_reader.go
- big_query_writer.go
- csv_transformer.go
- csv_writer.go
- doc.go
- file_reader.go
- ftp_writer.go
- func_transformer.go
- http_request.go
- io_reader.go
- io_reader_writer.go
- io_writer.go
- mysql_writer.go
- passthrough.go
- postgresql_deleter_and_writer.go
- postgresql_writer.go
- regexp_matcher.go
- scp.go
- sql_executor.go
- sql_passthrough_executor.go
- sql_reader.go
- sql_reader_mysql_writer.go
- sql_reader_postgresql_writer.go
- sql_writer_data.go