Documentation
¶
Overview ¶
package goetl is a library for performing data pipeline / ETL tasks in Go.
The main construct in goetl is Pipeline. A Pipeline has a series of PipelineStages, which will each perform some type of data processing, and then send new data on to the next stage. Each PipelineStage consists of one or more Processors, which are responsible for receiving, processing, and then sending data on to the next stage of processing. DataProcessors each run in their own goroutine, and therefore all data processing can be executing concurrently.
Here is a conceptual drawing of a fairly simple Pipeline:
+--Pipeline------------------------------------------------------------------------------------------+ | PipelineStage 3 | | +---------------------------+ | | PipelineStage 1 PipelineStage 2 +-JSON---> | CSVWriter | | | +------------------+ +-----------------------+ | +---------------------------+ | | | SQLReader +-JSON----> | Custom Processor +--+ | | +------------------+ +-----------------------+ | +---------------------------+ | | +-JSON---> | SQLWriter | | | +---------------------------+ | +----------------------------------------------------------------------------------------------------+
In this example, we have a Pipeline consisting of 3 PipelineStages. The first stage has a Processor that runs queries on a SQL database, the second is doing custom transformation work on that data, and the third stage branches into 2 Processors, one writing the resulting data to a CSV file, and the other inserting into another SQL database.
In the example above, Stage 1 and Stage 3 are using built-in Processors (see the "processors" package/subdirectory). However, Stage 2 is using a custom implementation of Processor. By using a combination of built-in processors, and supporting the writing of any Go code to process data, goetl makes it possible to write very custom and fast data pipeline systems. See the Processor documentation to learn more.
Since each Processor is running in its own goroutine, SQLReader can continue pulling and sending data while each subsequent stage is also processing data. Optimally-designed pipelines have processors that can each run in an isolated fashion, processing data without having to worry about what's coming next down the pipeline.
All data payloads sent between Processors implement the etldata.Payload interface. Built-in processors send data flows using the type etldata.JSON. This provides a good balance of consistency and flexibility. See the "data" package for details and helper functions for dealing with etldata.Payload and etldata.JSON. Another good read for handling JSON data in Go is http://blog.golang.org/json-and-go.
Note that many of the concepts in goetl were taken from the Golang blog's post on pipelines (http://blog.golang.org/pipelines). While the details discussed in that blog post are largely abstracted away by goetl, it is still an interesting read and will help explain the general concepts being applied.
Creating and Running a Basic Pipeline ¶
There are two ways to construct and run a Pipeline. The first is a basic, non-branching Pipeline. For example:
+------------+ +-------------------+ +---------------+ | SQLReader +---> CustomTransformer +---> SQLWriter | +------------+ +-------------------+ +---------------+
This is a 3-stage Pipeline that queries some SQL data in stage 1, does some custom data transformation in stage 2, and then writes the resulting data to a SQL table in stage 3. The code to create and run this basic Pipeline would look something like:
// First initalize the Processors read := processors.NewSQLReader(db1, "SELECT * FROM source_table") transform := NewCustomTransformer() // (This would your own custom Processor implementation) write := processors.NewSQLWriter(db2, "destination_table") // Then create a new Pipeline using them pipeline := goetl.NewPipeline(read, transform, write) // Finally, run the Pipeline and wait for either an error or nil to be returned err := <-pipeline.Run()
Creating and Running a Branching Pipeline ¶
The second way to construct a Pipeline is using a PipelineLayout. This method allows for more complex Pipeline configurations that support branching between stages that are running multiple DataProcessors. Here is a (fairly complex) example:
+----------------------+ +------> SQLReader (Dynamic) +--+ | +----------------------+ | | | +---------------------------+ | +----------------------+ | +-----------+ +-----> SQLReader (Dynamic Query) +------+ +--> Custom Processor +-------> CSVWriter | +-----------+ | +---------------------------+ | | +----------------------+ | +-----------+ | SQLReader +--+ +------+ | +-----------+ | +---------------------------+ | | +----------------------+ | +-----------+ +-----> Custom Processor +------+------> Custom Processor +--+ +-> SQLWriter | +---------------------------+ | +----------------------+ | +-----------+ | | | +----------------------+ | +---------> Passthrough +-----+ +----------------------+
This Pipeline consists of 4 stages where each Processor is choosing which Processors in the subsequent stage should receive the data it sends. The SQLReader in stage 2, for example, is sending data to only 2 processors in the next stage, while the Custom Processor in stage 2 is sending its data to 3. The code for constructing and running a Pipeline like this would look like:
// First, initialize all the DataProcessors that will be used in the Pipeline query1 := processors.NewSQLReader(db1, "SELECT * FROM source_table") query2 := processors.NewSQLReader(db1, sqlGenerator1) // sqlGenerator1 would be a function that generates the query at run-time. See SQLReader docs. custom1 := NewCustomProcessor1() query3 := processors.NewSQLReader(db2, sqlGenerator2) custom2 := NewCustomProcessor2() custom3 := NewCustomProcessor3() passthrough := processors.NewPassthrough() writeMySQL := processors.NewSQLWriter(db3, "destination_table") writeCSV := processors.NewCSVWriter(file) // Next, construct and validate the PipelineLayout. Each DataProcessor // is inserted into the layout via calls to goetl.Do(). layout, err := goetl.NewPipelineLayout( goetl.NewPipelineStage( goetl.Do(query1).Outputs(query2), goetl.Do(query1).Outputs(custom1), ), goetl.NewPipelineStage( goetl.Do(query2).Outputs(query3, custom3), goetl.Do(custom1).Outputs(custom2, custom3, passthrough), ), goetl.NewPipelineStage( goetl.Do(query3).Outputs(writeCSV), goetl.Do(custom2).Outputs(writeCSV), goetl.Do(custom3).Outputs(writeCSV), goetl.Do(passthrough).Outputs(writeMySQL), ), goetl.NewPipelineStage( goetl.Do(writeCSV), goetl.Do(writeMySQL), ), ) if err != nil { // layout is invalid panic(err.Error()) } // Finally, create and run the Pipeline pipeline := goetl.NewBranchingPipeline(layout) err = <-pipeline.Run()
This example is only conceptual, the main points being to explain the flexibility you have when designing your Pipeline's layout and to demonstrate the syntax for constructing a new PipelineLayout.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var StartSignal = "GO"
StartSignal is what's sent to a starting Processor to kick off execution. Typically this value will be ignored.
Functions ¶
This section is empty.
Types ¶
type ConcurrentProcessor ¶
ConcurrentProcessor is a Processor that also defines a level of concurrency. For example, if Concurrency() returns 2, then the pipeline will allow the stage to execute up to 2 ProcessData() calls concurrently.
Note that the order of data processing is maintained, meaning that when a Processor receives ProcessData calls d1, d2, ..., the resulting data payloads sent on the outputChan will be sent in the same order as received.
type DataProcessor ¶
type DataProcessor struct { Processor // contains filtered or unexported fields }
DataProcessor is a type used internally to the Pipeline management code, and wraps a Processor instance. Processor is the main interface that should be implemented to perform work within the data pipeline, and this DataProcessor type simply embeds it and adds some helpful channel management and other attributes.
func Do ¶
func Do(processor Processor) *DataProcessor
Do takes a Processor instance and returns the DataProcessor type that will wrap it for internal processing. The details of the DataProcessor wrapper type are abstracted away from the implementing end-user code. The "Do" function is named succinctly to provide a nicer syntax when creating a PipelineLayout. See the goetl package documentation for code examples of creating a new branching pipeline layout.
func (*DataProcessor) Outputs ¶
func (dp *DataProcessor) Outputs(processors ...Processor) *DataProcessor
Outputs should be called to specify which Processor instances the current processor should send it's output to. See the goetl package documentation for code examples and diagrams.
func (*DataProcessor) String ¶
func (dp *DataProcessor) String() string
pass through String output to the Processor
type Pipeline ¶
type Pipeline struct { Name string // Name is simply for display purpsoses in log output. BufferLength int // Set to control channel buffering, default is 8. PrintData bool // Set to true to log full data payloads (only in Debug logging mode). // contains filtered or unexported fields }
Pipeline is the main construct used for running a series of stages within a data pipeline.
func NewBranchingPipeline ¶
func NewBranchingPipeline(layout *PipelineLayout) *Pipeline
NewBranchingPipeline creates a new pipeline ready to run the given PipelineLayout, which can accommodate branching/merging between stages each containing variable number of Processors. See the goetl package documentation for code examples and diagrams.
func NewPipeline ¶
NewPipeline creates a new pipeline ready to run the given Processors. For more complex use-cases, see NewBranchingPipeline.
func (*Pipeline) Run ¶
Run finalizes the channel connections between PipelineStages and kicks off execution. Run will return a killChan that should be waited on so your calling function doesn't return prematurely. Any stage of the pipeline can send to the killChan to halt execution. Your calling function should check if the sent value is an error or nil to know if execution was a failure or a success (nil being the success value).
type PipelineIface ¶
type PipelineIface interface {
Run() chan error
}
PipelineIface provides an interface to enable mocking the Pipeline. This makes unit testing your code that uses pipelines easier.
type PipelineLayout ¶
type PipelineLayout struct {
// contains filtered or unexported fields
}
PipelineLayout holds a series of PipelineStage instances.
func NewPipelineLayout ¶
func NewPipelineLayout(stages ...*PipelineStage) (*PipelineLayout, error)
NewPipelineLayout creates and validates a new PipelineLayout instance which can be used to create a "branching" Pipeline. A PipelineLayout consists of a series of PipelineStages, where each PipelineStage consists of one or more Processors. See the goetl package documentation for code examples and diagrams.
This function will return an error if the given layout is invalid. A valid layout meets these conditions:
- Processors in the final PipelineStage must NOT have outputs set.
- Processors in a non-final stage MUST have outputs set.
- Outputs must point to a Processor in the next immediate stage.
- A Processor must be pointed to by one of the previous Outputs (unless it is in the first PipelineStage).
type PipelineStage ¶
type PipelineStage struct {
// contains filtered or unexported fields
}
PipelineStage holds one or more Processor instances.
func NewPipelineStage ¶
func NewPipelineStage(processors ...*DataProcessor) *PipelineStage
NewPipelineStage creates a PipelineStage instance given a series of Processors. DataProcessor is a wrapper around an object implementing the Processor interface. The syntax used to create PipelineLayouts abstracts this type away from your implementing code. For example:
layout, err := goetl.NewPipelineLayout( goetl.NewPipelineStage( goetl.Do(aProcessor).Outputs(anotherProcessor), // ... ), // ... )
Notice how the goetl.Do() and Outputs() functions allow you to insert Processor instances into your PipelineStages without having to worry about the internal DataProcessor type or how any of the channel management works behind the scenes.
See the goetl package documentation for more code examples.
type Processor ¶
type Processor interface { // ProcessData will be called for each data sent from the previous stage. // ProcessData is called with a etldata.Payload instance, which is the data being received, // an outputChan, which is the channel to send data to, and a killChan, // which is a channel to send unexpected errors to (halting execution of the Pipeline). ProcessData(d etldata.Payload, outputChan chan etldata.Payload, killChan chan error) // Finish will be called after the previous stage has finished sending data, // and no more data will be received by this Processor. Often times // Finish can be an empty function implementation, but sometimes it is // necessary to perform final data processing. Finish(outputChan chan etldata.Payload, killChan chan error) }
Processor is the interface that should be implemented to perform data-related tasks within a Pipeline. Processors are responsible for receiving, processing, and then sending data on to the next stage of processing.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package data holds custom types and functions for passing JSON between stages.
|
Package data holds custom types and functions for passing JSON between stages. |
package etlutil has various helper functions used by components of goetl.
|
package etlutil has various helper functions used by components of goetl. |
Package logger is a simple but customizable logger used by goetl.
|
Package logger is a simple but customizable logger used by goetl. |
Package processors contains built-in Processor implementations that are generic and potentially useful across any ETL project.
|
Package processors contains built-in Processor implementations that are generic and potentially useful across any ETL project. |