Documentation ¶
Index ¶
- Constants
- Variables
- type AutoSQLTransform
- type Condition
- func HasAtLeastNDistinctValuesCondition(col string, n int) (Condition, error)
- func HasAtLeastNRowsCondition(n int) (Condition, error)
- func HasAtMostNDistinctValuesCondition(col string, n int) (Condition, error)
- func HasAtMostNRowsCondition(n int) (Condition, error)
- func HasExactlyNDistinctValuesCondition(col string, n int) (Condition, error)
- func HasExactlyNRowsCondition(n int) (Condition, error)
- func HasNoDuplicates(col string) (Condition, error)
- func HasNoNullValues(col string) (Condition, error)
- func NewSQLCondition(sql string) (Condition, error)
- type ConsoleDestination
- type ConsoleLogger
- type Coordinator
- type DefaultInserter
- func (d *DefaultInserter) Initialize(l Logger, tableName string, db *sql.DB, cols []string) error
- func (d *DefaultInserter) InsertBatch(tx *sql.Tx, msgs []Message) error
- func (d *DefaultInserter) New() SQLInserter
- func (d *DefaultInserter) PreCommit() error
- func (d *DefaultInserter) Statement() string
- type Destination
- type DestinationHook
- type DevNull
- type Event
- type ExcelDestination
- type ExcelRange
- type ExcelRangePoint
- type ExcelSource
- type GenericLogger
- type GraphNode
- type HTTPSource
- type LiteralSource
- type LiteralSourceFormat
- type LogLevel
- type Logger
- type MSSQLInserter
- type MandrillDestination
- type MandrillPrincipal
- type Message
- type Middleware
- type NamedSliceSource
- type ParameterTable
- type ParameterTableDestination
- type Passthrough
- type PostgresInserter
- type SQLDestination
- type SQLInserter
- type SQLSource
- type Sequenceable
- type SequenceableTransform
- type Sequencer
- type SlackOpts
- type SliceDestination
- type SliceSource
- type Source
- type SourceHook
- type Stopper
- type Stream
- type TransactionManager
- type Transform
- type TransformHook
Constants ¶
const ( DefaultBufferSize = 100 DestinationWildcard = "" )
const ConsoleDestinationName = "CONSOLE"
const DefaultExcelDateFormat = time.RFC3339
const DefaultRowsPerBatch = 500
const InsertQuery = `INSERT INTO %s (%s) VALUES (%s)`
const (
ParameterTableName = "PARAMETERS"
)
const TxManagerMaxRetries = 32
Variables ¶
var ( ErrExcelTooManyWildcards = errors.New("the Excel source/destination range can have at most one wildcard") ErrExcelCannotIncludeColumns = errors.New("the Excel source range cannot be dynamic in X if it includes columns") ErrColumnsNotSpecified = errors.New("the Excel range should either include columns or they should be specified in the COLUMNS option") )
var ErrEOS = errors.New("end of stream")
var ErrInterrupted = errors.New("The execution was interrupted by a context cancellation")
var ErrTransactionManagerFinished = errors.New("transaction manager is in a committed or rolled back state and can no longer provide new transactions")
var Inserters = map[string]SQLInserter{"mssql": &MSSQLInserter{}, "postgres": &PostgresInserter{}}
var LiteralSourceFormats = map[string]LiteralSourceFormat{ "JSON_ARRAY": JSONArray, "JSON_OBJECTS": JSONObjects, "CSV": CSVWithoutHeader, }
var SQLDriverManager sqlDriverManager
SQLDriverManager is a singleton that makes sure there is only a single DB object per connection, rather than one per source/destination
Functions ¶
This section is empty.
Types ¶
type AutoSQLTransform ¶
type AutoSQLTransform struct { Name string Table string `aql: "STAGING_TABLE, optional"` StagingSQLConnString string `aql: "STAGING_CONNECTION_STRING, optional"` Query string ParameterTable *ParameterTable ParameterNames []string // contains filtered or unexported fields }
AutoSQLTransform is a transform that drains the source, sticks the rows in an in-memory SQLite database (not GLOBAL - it doesn't share the cache), and then runs an SQL query on that, returning the result as rows.
Essentially, this makes it a combination of a SQL destination and a SQL source, where the two are automatically wired up to work together.
A current limitation is that the source dataset must fit entirely in memory. If this is not possible, it will be necessary to use eg. a GLOBAL destination and to configure SET IN_MEMORY = 'OFF';
func (*AutoSQLTransform) Open ¶
func (a *AutoSQLTransform) Open(source Stream, dest Stream, l Logger, st Stopper)
func (*AutoSQLTransform) SetName ¶
func (a *AutoSQLTransform) SetName(name string)
type Condition ¶
Condition is a func that returns true if the message passes the test and false otherwise.
func HasAtMostNRowsCondition ¶
func HasNoDuplicates ¶
func HasNoNullValues ¶
func NewSQLCondition ¶
type ConsoleDestination ¶
type ConsoleDestination struct { Name string FormatAsJSON bool Writer io.Writer // contains filtered or unexported fields }
func (*ConsoleDestination) Open ¶
func (cd *ConsoleDestination) Open(s Stream, l Logger, st Stopper)
func (*ConsoleDestination) Ping ¶
func (cd *ConsoleDestination) Ping() error
type ConsoleLogger ¶
type ConsoleLogger struct { MinLevel LogLevel // contains filtered or unexported fields }
func NewConsoleLogger ¶
func NewConsoleLogger(minLevel LogLevel) *ConsoleLogger
func (*ConsoleLogger) Chan ¶
func (cl *ConsoleLogger) Chan() chan<- Event
func (*ConsoleLogger) Error ¶
func (cl *ConsoleLogger) Error() error
func (*ConsoleLogger) Wait ¶
func (cl *ConsoleLogger) Wait()
type Coordinator ¶
type Coordinator interface { RegisterHooks(...interface{}) //arguments should be SourceHook, TransformHook or DestinationHook AddSource(name string, alias string, s Source) error AddDestination(name string, alias string, d Destination) error AddTest(node string, name string, desc string, c Condition) error AddTransform(name string, alias string, t Transform) error AddConstraint(before, after string) error Connect(from string, to string) error UseContext(ctx context.Context) Compile() error Execute() error Stop() }
func NewCoordinator ¶
func NewCoordinator(logger Logger, txManager TransactionManager) Coordinator
type DefaultInserter ¶
type DefaultInserter struct {
// contains filtered or unexported fields
}
func (*DefaultInserter) Initialize ¶
func (*DefaultInserter) InsertBatch ¶
func (d *DefaultInserter) InsertBatch(tx *sql.Tx, msgs []Message) error
func (*DefaultInserter) New ¶
func (d *DefaultInserter) New() SQLInserter
func (*DefaultInserter) PreCommit ¶
func (d *DefaultInserter) PreCommit() error
func (*DefaultInserter) Statement ¶
func (d *DefaultInserter) Statement() string
type Destination ¶
type Destination interface { //Ping checks that the destination is available. It is used to verify //the destination at runtime. Ping() error //Open gives the destination a stream to start pulling from and an error stream Open(Stream, Logger, Stopper) }
func NewParameterTableDestination ¶
func NewParameterTableDestination(p *ParameterTable, cols []string) Destination
type DestinationHook ¶
type DestinationHook func(string, Destination) (Destination, error)
DestinationHook takes the destination name and interface and does something to it, possibly returning an error. If it returns a non-nil Destination, this will overwrite the existing Destination.
type ExcelDestination ¶
type ExcelDestination struct { Name string Filename string `aql:"FILE"` Overwrite bool `aql:"OVERWRITE, optional"` Template string `aql:"TEMPLATE, optional"` Sheet string `aql:"SHEET"` Range ExcelRange Alias string Transpose bool `aql:"TRANSPOSE, optional"` Cols []string `aql:"COLUMNS, optional"` // contains filtered or unexported fields }
func (*ExcelDestination) Ping ¶
func (ed *ExcelDestination) Ping() error
type ExcelRange ¶
type ExcelRange struct { X1 int Y1 int X2 ExcelRangePoint Y2 ExcelRangePoint }
type ExcelRangePoint ¶
type ExcelSource ¶
type ExcelSource struct { Name string Filename string `aql:"FILE"` Sheet string `aql:"SHEET"` Range ExcelRange RangeIncludesColumns bool Dateformat string Cols []string `aql:"COLUMNS, optional"` // contains filtered or unexported fields }
func (*ExcelSource) Columns ¶
func (s *ExcelSource) Columns() []string
func (*ExcelSource) Ping ¶
func (s *ExcelSource) Ping() error
func (*ExcelSource) SetName ¶
func (s *ExcelSource) SetName(name string)
type GenericLogger ¶
type GenericLogger struct { MinLevel LogLevel Writer io.Writer // contains filtered or unexported fields }
func NewGenericLogger ¶
func NewGenericLogger(minLevel LogLevel, writer io.Writer) *GenericLogger
func (*GenericLogger) Chan ¶
func (gl *GenericLogger) Chan() chan<- Event
func (*GenericLogger) Error ¶
func (gl *GenericLogger) Error() error
func (*GenericLogger) Wait ¶
func (gl *GenericLogger) Wait()
type HTTPSource ¶
type HTTPSource struct { Name string URL string `aql:"URL"` //URL of request Headers map[string]string //Headers to add to request, optional JSONPath string `aql:"JSON_PATH, optional"` //Path to object containing array of rows, optional NoColumnNames bool //If response has array of primitive types rather than objects with column names, eg. ["bob",2] instead of {"name": "bob", "age": 2} ColumnNames []string `aql:"COLUMNS, optional"` //if NoColumnNames is true, this should be provided PaginationLimitName string `aql:"PAGINATION_LIMIT_PARAMETER, optional"` //query parameter for pagination limit (optional) PaginationOffsetName string `aql:"PAGINATION_OFFSET_PARAMETER, optional"` //query parameter for pagination offset (optional) PageSize int `aql:"PAGE_SIZE, optional"` //size of page for pagination // contains filtered or unexported fields }
func (*HTTPSource) Ping ¶
func (h *HTTPSource) Ping() error
func (*HTTPSource) SetName ¶
func (h *HTTPSource) SetName(name string)
type LiteralSource ¶
type LiteralSource struct { Name string Content string Columns []string Format LiteralSourceFormat // contains filtered or unexported fields }
func (*LiteralSource) Ping ¶
func (ls *LiteralSource) Ping() error
func (*LiteralSource) SetName ¶
func (ls *LiteralSource) SetName(name string)
type LiteralSourceFormat ¶
type LiteralSourceFormat int
const ( //JSONArray is a flat array eg. [[2,3],[3,4]] JSONArray LiteralSourceFormat = iota //JSONObjects is an array of objects, eg. [{"a": 1, "b": 2}, {"a": 4, "b": 5}] JSONObjects //CSVWithoutHeader is a CSV string without headers, eg. 1, 2\n4, 5. Only string //types are supported - other types will not be inferred, so eg. the above example //will map to strings ["1", "2"], ["4", "5"]. CSVWithoutHeader )
type Logger ¶
type Logger interface { // Chan returns a chan that can be used to log events Chan() chan<- Event // Error returns the latest error that has been logged. The logger must keep track of this. Error() error // Wait should block until the logger is done processing messages in its chan. The sender should close the chan before calling this or it will deadlock Wait() }
func SlackWrapper ¶
SlackWrapper intercepts messages to a logger and forwards any with the given minimum log level to Slack incoming Webhook.
type MSSQLInserter ¶
type MSSQLInserter struct {
// contains filtered or unexported fields
}
func (*MSSQLInserter) Initialize ¶
func (*MSSQLInserter) InsertBatch ¶
func (m *MSSQLInserter) InsertBatch(tx *sql.Tx, msgs []Message) error
func (*MSSQLInserter) New ¶
func (m *MSSQLInserter) New() SQLInserter
func (*MSSQLInserter) PreCommit ¶
func (m *MSSQLInserter) PreCommit() error
type MandrillDestination ¶
type MandrillDestination struct { Name string APIKey string `aql:"API_KEY"` Sender *MandrillPrincipal Recipients []MandrillPrincipal SplitByRow bool `aql:"SPLIT, optional"` Template string `aql:"TEMPLATE"` Subject string `aql:"SUBJECT, optional"` // contains filtered or unexported fields }
func (*MandrillDestination) Open ¶
func (d *MandrillDestination) Open(s Stream, l Logger, st Stopper)
func (*MandrillDestination) Ping ¶
func (d *MandrillDestination) Ping() error
type MandrillPrincipal ¶
func ParseEmailRecipients ¶
func ParseEmailRecipients(s string) ([]MandrillPrincipal, error)
type Middleware ¶
Middleware is a func that transforms a stream.
type NamedSliceSource ¶
type NamedSliceSource struct {
// contains filtered or unexported fields
}
func (*NamedSliceSource) Open ¶
func (ns *NamedSliceSource) Open(dest Stream, logger Logger, stop Stopper)
func (*NamedSliceSource) Ping ¶
func (ns *NamedSliceSource) Ping() error
func (*NamedSliceSource) SetName ¶
func (ns *NamedSliceSource) SetName(name string)
type ParameterTable ¶
func NewParameterTable ¶
func NewParameterTable() *ParameterTable
func (*ParameterTable) Declare ¶
func (p *ParameterTable) Declare(name string) error
func (*ParameterTable) Get ¶
func (p *ParameterTable) Get(name string) (interface{}, bool)
func (*ParameterTable) Set ¶
func (p *ParameterTable) Set(name string, value interface{}) error
type ParameterTableDestination ¶
type ParameterTableDestination struct {
// contains filtered or unexported fields
}
func (*ParameterTableDestination) Open ¶
func (p *ParameterTableDestination) Open(s Stream, l Logger, st Stopper)
func (*ParameterTableDestination) Ping ¶
func (p *ParameterTableDestination) Ping() error
type Passthrough ¶
func (*Passthrough) Open ¶
func (p *Passthrough) Open(source Stream, dest Stream, logger Logger, stop Stopper)
func (*Passthrough) SetName ¶
func (p *Passthrough) SetName(name string)
type PostgresInserter ¶
type PostgresInserter struct {
// contains filtered or unexported fields
}
func (*PostgresInserter) Initialize ¶
func (*PostgresInserter) InsertBatch ¶
func (m *PostgresInserter) InsertBatch(tx *sql.Tx, msgs []Message) error
func (*PostgresInserter) New ¶
func (m *PostgresInserter) New() SQLInserter
func (*PostgresInserter) PreCommit ¶
func (m *PostgresInserter) PreCommit() error
type SQLDestination ¶
type SQLDestination struct { Name string Driver string ConnectionString string Table string `aql:"TABLE"` Tx *sql.Tx RowsPerBatch int `aql:"ROWS_PER_BATCH,optional"` DropNulls bool `aql:"DROP_NULLS,optional"` TxUseFunc func() (*sql.Tx, error) TxReleaseFunc func() Alias string // contains filtered or unexported fields }
func (*SQLDestination) Columns ¶
func (sq *SQLDestination) Columns() []string
func (*SQLDestination) Ping ¶
func (sq *SQLDestination) Ping() error
type SQLInserter ¶
type SQLInserter interface { New() SQLInserter //Initialize with connection details and database. Initialize(l Logger, tableName string, db *sql.DB, cols []string) error //Insert a single batch InsertBatch(tx *sql.Tx, msgs []Message) error //Hook that is called before the transaction manager/etc commits/rollbacks the transaction PreCommit() error }
SQLInserter inserts rows into a SQL database. It contains driver-specific optimisations:
- MS SQL Server: uses bulk copy
It does not perform any transaction management.
type SQLSource ¶
type Sequenceable ¶
type Sequenceable interface {
Sequence(seq []string)
}
type SequenceableTransform ¶
type SequenceableTransform interface { Transform Sequenceable }
type Sequencer ¶
Sequencer is a synchronization utility to ensure that a collection of named tasks run in a given sequence even if they are started in parallel.
func NewSequencer ¶
type SliceDestination ¶
func (*SliceDestination) Open ¶
func (sd *SliceDestination) Open(s Stream, logger Logger, stop Stopper)
func (*SliceDestination) Ping ¶
func (sd *SliceDestination) Ping() error
func (*SliceDestination) Results ¶
func (sd *SliceDestination) Results() [][]interface{}
type SliceSource ¶
type SliceSource struct {
// contains filtered or unexported fields
}
func (*SliceSource) Ping ¶
func (s *SliceSource) Ping() error
func (*SliceSource) SetName ¶
func (s *SliceSource) SetName(name string)
type Source ¶
type Source interface { //SetName sets the name (or alias) of the source for outgoing messages SetName(name string) //Ping attempts to connect to the source without creating a stream. //This is used to check that the source is valid at run-time. Ping() error //Get connects to the source and returns a stream of data. Open(Stream, Logger, Stopper) }
Source represents data inputs into the system, eg. a database query.
func NewNamedSliceSource ¶
func NewSliceSource ¶
type SourceHook ¶
SourceHook takes the source name and interface and does something to it, possibly returning an error. If it returns a non-nil Source, this will overwrite the existing Source.
type Stopper ¶
type Stopper interface { //Stopped checks if the stopper is stopped Stopped() bool //Stops. This is irreversible. Stop() }
Stopper is used as a condition variable stop halt the execution of the program. It is safe for concurrent use by multiple goroutines.
func NewStopper ¶
func NewStopper() Stopper
type Stream ¶
type Stream interface { //Columns returns a slice of column names Columns() []string //SetColumns sets the destination columns. destination can be a wildcard. SetColumns(destination string, cols []string) error //Chan is the channel for the stream. It will be closed by the sender when the stream is at an end. Chan(destination string) chan Message }
Stream represents a stream of data such as a database resultset
func NewSequencedStream ¶
type TransactionManager ¶
type TransactionManager interface { // Register makes the connection known to the connection manager. It does // NOT begin a new transaction. Register(aql.Connection) error // Use will begin a new transaction (if none exists) or re-use the existing // transaction, locking it so that no one may concurrently use it. Tx(connection string) (*sql.Tx, error) // Release the transaction so that it may be used by others. It panics if the // connection has not been registered. Release(connection string) // Commit commits ALL transactions. It is an error to call Use() or Register() // after Commit(). Commit() error // Rollback rolls back ALL transactions. It is an error to call Use() or Register() // after Commit(). Rollback() error }
TransactionManager provides a single transaction per connection, to be used by all components that read or write from the connection. All transactions are then either committed or rolled back together. It is a 2PC Tx manager. Only supported for connections implementing sql.Tx for now.
func NewTransactionManager ¶
func NewTransactionManager(l Logger) TransactionManager
type Transform ¶
type Transform interface { //SetName sets the alias of the transform for outgoing messages SetName(name string) //Open gives the transform a stream to start pulling from Open(source Stream, dest Stream, logger Logger, stop Stopper) }
Transform is a component that is neither a source nor a sink. It is configured with one or more sources, and one or more sinks.
Source Files ¶
- auto_sql_transform.go
- condition.go
- console_dest.go
- coordinator.go
- destination.go
- excel_dest.go
- excel_source.go
- http_source.go
- literal_source.go
- logger.go
- mandrill_destination.go
- middleware.go
- multiplexer.go
- parameters.go
- sequencer.go
- slack.go
- source.go
- sql_dest.go
- sql_driver_manager.go
- sql_inserter.go
- sql_source.go
- stopper.go
- stream.go
- transaction_manager.go
- transform.go
- util.go