Documentation ¶
Index ¶
- Variables
- func ClientDelete(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientGet(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientOptions(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientPatch(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientPost(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ClientPut(serverURL string, route RouteName, m map[string]interface{}, ...) (respStr string, err error)
- func ErrorHelper(err error) (helpString string)
- func GetJWTFromKey(masterServerURL, key string) (string, error)
- func GetSQLText(sqlStringPath string) (string, error)
- func IsJSONorYAML(payload string) bool
- func LoadProject(path string)
- func NewExecID() string
- func SetStreamDefaults(name string, stream *ReplicationStreamConfig, replicationCfg ReplicationConfig)
- func Sling(cfg *Config) (err error)
- func StateSet(t *TaskExecution)
- type Config
- func (cfg *Config) AsReplication() (rc ReplicationConfig)
- func (cfg *Config) ColumnsPrepared() (columns iop.Columns)
- func (cfg *Config) DetermineType() (Type JobType, err error)
- func (cfg *Config) FormatTargetObjectName() (err error)
- func (cfg *Config) GetFormatMap() (m map[string]any, err error)
- func (cfg *Config) HasIncrementalVal() bool
- func (cfg *Config) HasWildcard() bool
- func (cfg *Config) IgnoreExisting() bool
- func (cfg *Config) IsFileStreamWithStateAndParts() bool
- func (cfg *Config) MD5() string
- func (cfg *Config) Prepare() (err error)
- func (cfg *Config) ReplicationMode() bool
- func (cfg *Config) Scan(value interface{}) error
- func (cfg *Config) SetDefault()
- func (cfg *Config) SrcConnMD5() string
- func (cfg *Config) StreamID() string
- func (cfg *Config) TgtConnMD5() string
- func (cfg *Config) TransformsPrepared() (colTransforms map[string][]string)
- func (cfg *Config) Unmarshal(cfgStr string) (err error)
- func (cfg Config) Value() (driver.Value, error)
- type ConfigOptions
- type ConnState
- type DateTimeState
- type ExecStatus
- type ExecutionState
- type ExecutionStatus
- type Hook
- type HookKind
- type HookMap
- type HookStage
- type HookType
- type Hooks
- type JobType
- type Mode
- type NotificationConfig
- type ObjectState
- type OnFailType
- type ParseOptions
- type Pipeline
- type PipelineState
- type PipelineStep
- type PipelineSteps
- type ProgressBar
- type Project
- type ProjectConfig
- type ReplicationConfig
- func (rd *ReplicationConfig) AddStream(key string, cfg *ReplicationStreamConfig)
- func (rd *ReplicationConfig) Compile(cfgOverwrite *Config, selectStreams ...string) (err error)
- func (rd *ReplicationConfig) DeleteStream(key string)
- func (rd ReplicationConfig) GetStream(name string) (streamName string, cfg *ReplicationStreamConfig, found bool)
- func (rd *ReplicationConfig) JSON() string
- func (rd *ReplicationConfig) MD5() string
- func (rd ReplicationConfig) MatchStreams(pattern string) (streams map[string]*ReplicationStreamConfig)
- func (rd ReplicationConfig) Normalize(n string) string
- func (rd *ReplicationConfig) OriginalCfg() string
- func (rd *ReplicationConfig) ParseReplicationHook(stage HookStage) (hooks Hooks, err error)
- func (rd *ReplicationConfig) ParseStreamHook(stage HookStage, rs *ReplicationStreamConfig) (hooks Hooks, err error)
- func (rd *ReplicationConfig) ProcessChunks() (err error)
- func (rd *ReplicationConfig) ProcessWildcards() (err error)
- func (rd *ReplicationConfig) ProcessWildcardsDatabase(c connection.Connection, patterns []string) (wildcards Wildcards, err error)
- func (rd *ReplicationConfig) ProcessWildcardsFile(c connection.Connection, patterns []string) (wildcards Wildcards, err error)
- func (rd *ReplicationConfig) RuntimeState() (_ *ReplicationState, err error)
- func (rd *ReplicationConfig) Scan(value interface{}) error
- func (rd ReplicationConfig) StreamsOrdered() []string
- func (rd ReplicationConfig) Value() (driver.Value, error)
- type ReplicationState
- type ReplicationStreamConfig
- type RouteName
- type RunState
- type RuntimeState
- type Source
- type SourceOptions
- type StatusMap
- type StreamState
- type Target
- type TargetOptions
- type TaskExecution
- func (t *TaskExecution) AddCleanupTaskFirst(f func())
- func (t *TaskExecution) AddCleanupTaskLast(f func())
- func (t *TaskExecution) AppendOutput(ll *g.LogLine)
- func (t *TaskExecution) Cleanup()
- func (t *TaskExecution) Data() *iop.Dataset
- func (t *TaskExecution) Df() *iop.Dataflow
- func (t *TaskExecution) Execute() error
- func (t *TaskExecution) ExecuteHooks(stage HookStage) (err error)
- func (t *TaskExecution) GetBytes() (inBytes, outBytes uint64)
- func (t *TaskExecution) GetBytesString() (s string)
- func (t *TaskExecution) GetCount() (count uint64)
- func (t *TaskExecution) GetRate(secWindow int) (rowRate, byteRate int64)
- func (t *TaskExecution) GetSourceTable() (sTable database.Table, err error)
- func (t *TaskExecution) GetStateMap() map[string]any
- func (t *TaskExecution) GetTargetTable(tempTableSuffix ...string) (tTable database.Table, err error)
- func (t *TaskExecution) GetTotalBytes() (rcBytes, txBytes uint64)
- func (t *TaskExecution) IsStalled(window float64) bool
- func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df *iop.Dataflow, err error)
- func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error)
- func (t *TaskExecution) SetProgress(progressText string, args ...interface{})
- func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn database.Connection) (cnt uint64, err error)
- func (t *TaskExecution) WriteToFile(cfg *Config, df *iop.Dataflow) (cnt uint64, err error)
- type Wildcard
- type Wildcards
Constants ¶
This section is empty.
Variables ¶
var AllExecStatus = []struct { Value ExecStatus TSName string }{ {ExecStatusCreated, "ExecStatusCreated"}, {ExecStatusQueued, "ExecStatusQueued"}, {ExecStatusStarted, "ExecStatusStarted"}, {ExecStatusRunning, "ExecStatusRunning"}, {ExecStatusSuccess, "ExecStatusSuccess"}, {ExecStatusTerminated, "ExecStatusTerminated"}, {ExecStatusInterrupted, "ExecStatusInterrupted"}, {ExecStatusTimedOut, "ExecStatusTimedOut"}, {ExecStatusError, "ExecStatusError"}, {ExecStatusSkipped, "ExecStatusSkipped"}, {ExecStatusStalled, "ExecStatusStalled"}, }
var AllJobType = []struct { Value JobType TSName string }{ {ConnTest, "ConnTest"}, {ConnDiscover, "ConnDiscover"}, {ConnExec, "ConnExec"}, {DbToDb, "DbToDb"}, {FileToDB, "FileToDB"}, {DbToFile, "DbToFile"}, {FileToFile, "FileToFile"}, {DbSQL, "DbSQL"}, }
var AllMode = []struct { Value Mode TSName string }{ {FullRefreshMode, "FullRefreshMode"}, {IncrementalMode, "IncrementalMode"}, {TruncateMode, "TruncateMode"}, {SnapshotMode, "SnapshotMode"}, {BackfillMode, "BackfillMode"}, }
var HookRunReplication func(string, *Config, ...string) error
var ParseHook = func(any, ParseOptions) (Hook, error) { return nil, nil }
var ShowProgress = true
var SourceDBOptionsDefault = SourceOptions{ EmptyAsNull: g.Bool(false), NullIf: g.String("NULL"), DatetimeFormat: "AUTO", MaxDecimals: g.Int(-1), }
var SourceFileOptionsDefault = SourceOptions{ EmptyAsNull: g.Bool(true), Header: g.Bool(true), Flatten: g.Bool(false), Compression: iop.CompressorTypePtr(iop.AutoCompressorType), NullIf: g.String("NULL"), DatetimeFormat: "AUTO", SkipBlankLines: g.Bool(false), FieldsPerRec: g.Int(-1), MaxDecimals: g.Int(-1), }
var StoreSet = func(t *TaskExecution) error { return nil }
Set in the store/store.go file for history keeping
var TargetDBOptionsDefault = TargetOptions{ FileMaxRows: lo.Ternary( os.Getenv("FILE_MAX_ROWS") != "", g.Int64(cast.ToInt64(os.Getenv("FILE_MAX_ROWS"))), g.Int64(0), ), UseBulk: g.Bool(true), AddNewColumns: g.Bool(true), AdjustColumnType: g.Bool(false), DatetimeFormat: "auto", MaxDecimals: g.Int(-1), ColumnCasing: g.Ptr(iop.SourceColumnCasing), }
var TargetFileOptionsDefault = TargetOptions{ Header: g.Bool(true), Compression: lo.Ternary( os.Getenv("COMPRESSION") != "", iop.CompressorTypePtr(iop.CompressorType(strings.ToLower(os.Getenv("COMPRESSION")))), iop.CompressorTypePtr(iop.AutoCompressorType), ), Concurrency: lo.Ternary( os.Getenv("CONCURRENCY") != "", cast.ToInt(os.Getenv("CONCURRENCY")), 7, ), FileMaxRows: lo.Ternary( os.Getenv("FILE_MAX_ROWS") != "", g.Int64(cast.ToInt64(os.Getenv("FILE_MAX_ROWS"))), g.Int64(0), ), FileMaxBytes: lo.Ternary( os.Getenv("FILE_MAX_BYTES") != "", g.Int64(cast.ToInt64(os.Getenv("FILE_MAX_BYTES"))), g.Int64(0), ), Format: dbio.FileTypeNone, UseBulk: g.Bool(true), AddNewColumns: g.Bool(true), DatetimeFormat: "auto", Delimiter: ",", MaxDecimals: g.Int(-1), ColumnCasing: g.Ptr(iop.SourceColumnCasing), }
Functions ¶
func ClientDelete ¶
func ClientDelete(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)
ClientDelete sends a DELETE request
func ClientGet ¶
func ClientGet(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)
ClientGet sends a GET request
func ClientOptions ¶
func ClientOptions(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)
ClientOptions sends a HEAD request
func ClientPatch ¶
func ClientPatch(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)
ClientPatch sends a PATCH request
func ClientPost ¶
func ClientPost(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)
ClientPost sends a POST request
func ClientPut ¶
func ClientPut(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)
ClientPut sends a PUT request
func ErrorHelper ¶ added in v1.0.61
func GetJWTFromKey ¶
GetJWTFromKey logs in and returns the JWT based on the provided key
func GetSQLText ¶ added in v1.1.14
GetSQLText process source sql file / text
func IsJSONorYAML ¶ added in v1.2.14
IsJSONorYAML detects a JSON or YAML payload
func LoadProject ¶
func LoadProject(path string)
func SetStreamDefaults ¶ added in v0.86.36
func SetStreamDefaults(name string, stream *ReplicationStreamConfig, replicationCfg ReplicationConfig)
func StateSet ¶ added in v1.3.5
func StateSet(t *TaskExecution)
Types ¶
type Config ¶
type Config struct { Source Source `json:"source,omitempty" yaml:"source,omitempty"` Target Target `json:"target" yaml:"target"` Mode Mode `json:"mode,omitempty" yaml:"mode,omitempty"` Transforms any `json:"transforms,omitempty" yaml:"transforms,omitempty"` Options ConfigOptions `json:"options,omitempty" yaml:"options,omitempty"` Env map[string]string `json:"env,omitempty" yaml:"env,omitempty"` StreamName string `json:"stream_name,omitempty" yaml:"stream_name,omitempty"` ReplicationStream *ReplicationStreamConfig `json:"replication_stream,omitempty" yaml:"replication_stream,omitempty"` SrcConn connection.Connection `json:"-" yaml:"-"` TgtConn connection.Connection `json:"-" yaml:"-"` Prepared bool `json:"-" yaml:"-"` IncrementalVal any `json:"incremental_val" yaml:"incremental_val"` IncrementalValStr string `json:"incremental_val_str" yaml:"incremental_val_str"` IncrementalGTE bool `json:"incremental_gte,omitempty" yaml:"incremental_gte,omitempty"` MetadataLoadedAt *bool `json:"-" yaml:"-"` MetadataStreamURL bool `json:"-" yaml:"-"` MetadataRowNum bool `json:"-" yaml:"-"` MetadataRowID bool `json:"-" yaml:"-"` MetadataExecID bool `json:"-" yaml:"-"` // contains filtered or unexported fields }
Config is the new config struct
func (*Config) AsReplication ¶ added in v1.1.14
func (cfg *Config) AsReplication() (rc ReplicationConfig)
func (*Config) ColumnsPrepared ¶ added in v1.2.16
ColumnsPrepared returns the prepared columns
func (*Config) DetermineType ¶ added in v0.84.9
func (*Config) FormatTargetObjectName ¶ added in v0.85.54
func (*Config) GetFormatMap ¶ added in v1.0.31
GetFormatMap returns a map to format a string with provided with variables
func (*Config) HasIncrementalVal ¶ added in v1.2.21
HasIncrementalVal returns true there is a non-null incremental value
func (*Config) HasWildcard ¶ added in v1.1.14
func (*Config) IgnoreExisting ¶ added in v1.2.10
IgnoreExisting returns true target_options.ignore_existing is true
func (*Config) IsFileStreamWithStateAndParts ¶ added in v1.3.6
func (*Config) ReplicationMode ¶ added in v0.87.34
ReplicationMode returns true for replication mode
func (*Config) SrcConnMD5 ¶ added in v1.2.15
func (*Config) TgtConnMD5 ¶ added in v1.2.15
func (*Config) TransformsPrepared ¶ added in v1.2.16
TransformsPrepared returns the transforms columns
type ConfigOptions ¶
type ConfigOptions struct { Debug bool `json:"debug,omitempty" yaml:"debug,omitempty"` StdIn bool `json:"-"` // whether stdin is passed StdOut bool `json:"stdout,omitempty" yaml:"stdout,omitempty"` // whether to output to stdout Dataset bool `json:"dataset,omitempty" yaml:"dataset,omitempty"` // whether to output to dataset }
ConfigOptions are configuration options
type ConnState ¶ added in v1.3.5
type ConnState struct { Name string `json:"name,omitempty"` Type dbio.Type `json:"type,omitempty"` Kind dbio.Kind `json:"kind,omitempty"` Bucket string `json:"bucket,omitempty"` Container string `json:"container,omitempty"` Database string `json:"database,omitempty"` Instance string `json:"instance,omitempty"` Schema string `json:"schema,omitempty"` }
type DateTimeState ¶ added in v1.3.5
type DateTimeState struct { Timestamp time.Time `json:"timestamp,omitempty"` Unix int64 `json:"unix,omitempty"` FileName string `json:"file_name,omitempty"` Rfc3339 string `json:"rfc3339,omitempty"` Date string `json:"date,omitempty"` Datetime string `json:"datetime,omitempty"` YYYY string `json:"YYYY,omitempty"` YY string `json:"YY,omitempty"` MMM string `json:"MMM,omitempty"` MM string `json:"MM,omitempty"` DD string `json:"DD,omitempty"` DDD string `json:"DDD,omitempty"` HH string `json:"HH,omitempty"` }
func (*DateTimeState) Update ¶ added in v1.3.5
func (dts *DateTimeState) Update()
type ExecStatus ¶
type ExecStatus string
ExecStatus is the status of an execution
const ( // ExecStatusCreated = created ExecStatusCreated ExecStatus = "created" // ExecStatusQueued = queued ExecStatusQueued ExecStatus = "queued" // ExecStatusStarted = started ExecStatusStarted ExecStatus = "started" // ExecStatusRunning = running ExecStatusRunning ExecStatus = "running" // ExecStatusSuccess = success ExecStatusSuccess ExecStatus = "success" // ExecStatusTerminated = terminated ExecStatusTerminated ExecStatus = "terminated" // ExecStatusInterrupted = interrupted ExecStatusInterrupted ExecStatus = "interrupted" // ExecStatusTimedOut = timed-out (when no heartbeat sent for 30 sec) ExecStatusTimedOut ExecStatus = "timed-out" // ExecStatusError = error ExecStatusError ExecStatus = "error" // ExecStatusSkipped = skipped ExecStatusSkipped ExecStatus = "skipped" // ExecStatusStalled = stalled (when still heartbeating, but rows are unchanged for a while) ExecStatusStalled ExecStatus = "stalled" // ExecStatusWarning = cancelled ExecStatusCancelled ExecStatus = "cancelled" // ExecStatusWarning = warning ExecStatusWarning ExecStatus = "warning" )
func (ExecStatus) IsFailure ¶
func (s ExecStatus) IsFailure() bool
IsFailure returns true if an execution is failed
func (ExecStatus) IsFinished ¶
func (s ExecStatus) IsFinished() bool
IsFinished returns true if an execution is finished
func (ExecStatus) IsRunning ¶
func (s ExecStatus) IsRunning() bool
IsRunning returns true if an execution is running
func (ExecStatus) IsSuccess ¶
func (s ExecStatus) IsSuccess() bool
IsSuccess returns true if an execution is successful
func (ExecStatus) IsWarning ¶ added in v1.2.23
func (s ExecStatus) IsWarning() bool
IsWarning returns true if an execution is warning
type ExecutionState ¶ added in v1.3.6
type ExecutionState struct { ID string `json:"id,omitempty"` FilePath string `json:"string,omitempty"` TotalBytes uint64 `json:"total_bytes,omitempty"` TotalRows uint64 `json:"total_rows,omitempty"` Status StatusMap `json:"status,omitempty"` StartTime *time.Time `json:"start_time,omitempty"` EndTime *time.Time `json:"end_time,omitempty"` Duration int64 `json:"duration,omitempty"` Error *string `json:"error,omitempty"` }
type ExecutionStatus ¶
type ExecutionStatus struct { JobID int `json:"job_id,omitempty"` ExecID int64 `json:"exec_id,omitempty"` Status ExecStatus `json:"status,omitempty"` Text string `json:"text,omitempty"` Rows uint64 `json:"rows,omitempty"` Bytes uint64 `json:"bytes,omitempty"` Percent int `json:"percent,omitempty"` Stalled bool `json:"stalled,omitempty"` Duration *int `json:"duration,omitempty"` AvgDuration int `json:"avg_duration,omitempty"` }
ExecutionStatus is an execution status object
type JobType ¶
type JobType string
JobType is an enum type for jobs
const ConnDiscover JobType = "conn-discover"
ConnTest is for a connection discover
const ConnExec JobType = "conn-exec"
ConnTest is for a connection exec
const ConnTest JobType = "conn-test"
ConnTest is for a connection test
const DbSQL JobType = "db-sql"
DbSQL is for a db sql query
const DbToDb JobType = "db-db"
DbToDb is from db to db
const DbToFile JobType = "db-file"
DbToFile is from db to file
const FileToDB JobType = "file-db"
FileToDB is from db to db
const FileToFile JobType = "file-file"
FileToFile is from file to file
type Mode ¶
type Mode string
Mode is a load mode
const ( // TruncateMode is to truncate TruncateMode Mode = "truncate" // FullRefreshMode is to drop FullRefreshMode Mode = "full-refresh" // IncrementalMode is to incremental IncrementalMode Mode = "incremental" // SnapshotMode is to snapshot SnapshotMode Mode = "snapshot" // BackfillMode is to backfill BackfillMode Mode = "backfill" )
type NotificationConfig ¶
type NotificationConfig struct { Name string `json:"name"` Emails []string `json:"emails"` Slack bool `json:"slack"` MsTeams bool `json:"msteams"` WebhookURLs []string `json:"webhook_urls"` // urls OnSuccess bool `json:"on_success"` OnFailure bool `json:"on_failure"` OnLinger bool `json:"on_linger"` OnEmpty bool `json:"on_empty"` }
type ObjectState ¶ added in v1.3.5
type OnFailType ¶ added in v1.4.1
type OnFailType string
type ParseOptions ¶ added in v1.3.5
type ParseOptions struct {
// contains filtered or unexported fields
}
type Pipeline ¶ added in v1.4.1
type Pipeline struct { Steps Hooks `json:"steps" yaml:"steps"` Env map[string]any `json:"env,omitempty" yaml:"env,omitempty"` // contains filtered or unexported fields }
func LoadPipelineConfig ¶ added in v1.4.1
func LoadPipelineConfigFromFile ¶ added in v1.4.1
func (*Pipeline) RuntimeState ¶ added in v1.4.1
func (pl *Pipeline) RuntimeState() (_ *PipelineState, err error)
RuntimeState returns the state for use
type PipelineState ¶ added in v1.4.1
type PipelineState struct { State map[string]map[string]any `json:"state,omitempty"` Env map[string]any `json:"env,omitempty"` Timestamp DateTimeState `json:"timestamp,omitempty"` Runs map[string]*RunState `json:"runs,omitempty"` Run *RunState `json:"run,omitempty"` }
func (*PipelineState) Marshall ¶ added in v1.4.1
func (ps *PipelineState) Marshall() string
func (*PipelineState) SetStateData ¶ added in v1.4.1
func (ps *PipelineState) SetStateData(id string, data map[string]any)
func (*PipelineState) SetStateKeyValue ¶ added in v1.4.1
func (ps *PipelineState) SetStateKeyValue(id, key string, value any)
func (*PipelineState) TaskExecution ¶ added in v1.4.1
func (ps *PipelineState) TaskExecution() *TaskExecution
type PipelineStep ¶ added in v1.4.1
type PipelineStep struct { }
type PipelineSteps ¶ added in v1.4.1
type PipelineSteps []PipelineStep
type ProgressBar ¶
type ProgressBar struct {
// contains filtered or unexported fields
}
func (*ProgressBar) Finish ¶
func (pb *ProgressBar) Finish()
func (*ProgressBar) SetStatus ¶
func (pb *ProgressBar) SetStatus(status string)
SetStatus sets the progress bar status
func (*ProgressBar) Start ¶
func (pb *ProgressBar) Start()
type Project ¶
type Project struct { Config ProjectConfig TaskConfigs map[string]Config }
type ProjectConfig ¶
type ProjectConfig struct { Project string `json:"project" yaml:"project"` TaskPaths []string `json:"task-paths" yaml:"task-paths"` Defaults map[string]interface{} `json:"defaults" yaml:"defaults"` NotificationTags map[string]NotificationConfig `json:"notification_tags" yaml:"notification_tags"` }
type ReplicationConfig ¶ added in v0.86.36
type ReplicationConfig struct { Source string `json:"source,omitempty" yaml:"source,omitempty"` Target string `json:"target,omitempty" yaml:"target,omitempty"` Hooks HookMap `json:"hooks,omitempty" yaml:"hooks,omitempty"` Defaults ReplicationStreamConfig `json:"defaults,omitempty" yaml:"defaults,omitempty"` Streams map[string]*ReplicationStreamConfig `json:"streams,omitempty" yaml:"streams,omitempty"` Env map[string]any `json:"env,omitempty" yaml:"env,omitempty"` // Tasks are compiled tasks Tasks []*Config `json:"tasks"` Compiled bool `json:"compiled"` FailErr string // error string to fail all (e.g. when the first tasks fails to connect) // contains filtered or unexported fields }
func LoadReplicationConfig ¶ added in v0.87.17
func LoadReplicationConfig(content string) (config ReplicationConfig, err error)
func LoadReplicationConfigFromFile ¶ added in v1.2.10
func LoadReplicationConfigFromFile(cfgPath string) (config ReplicationConfig, err error)
func UnmarshalReplication ¶ added in v0.86.39
func UnmarshalReplication(replicYAML string) (config ReplicationConfig, err error)
UnmarshalReplication converts a yaml file to a replication
func (*ReplicationConfig) AddStream ¶ added in v1.1.15
func (rd *ReplicationConfig) AddStream(key string, cfg *ReplicationStreamConfig)
func (*ReplicationConfig) Compile ¶ added in v1.2.2
func (rd *ReplicationConfig) Compile(cfgOverwrite *Config, selectStreams ...string) (err error)
Compile compiles the replication into tasks
func (*ReplicationConfig) DeleteStream ¶ added in v1.1.15
func (rd *ReplicationConfig) DeleteStream(key string)
func (ReplicationConfig) GetStream ¶ added in v1.1.15
func (rd ReplicationConfig) GetStream(name string) (streamName string, cfg *ReplicationStreamConfig, found bool)
GetStream returns the stream if the it exists
func (*ReplicationConfig) JSON ¶ added in v1.2.14
func (rd *ReplicationConfig) JSON() string
JSON returns json payload
func (*ReplicationConfig) MD5 ¶ added in v1.1.6
func (rd *ReplicationConfig) MD5() string
MD5 returns a md5 hash of the json payload
func (ReplicationConfig) MatchStreams ¶ added in v1.1.15
func (rd ReplicationConfig) MatchStreams(pattern string) (streams map[string]*ReplicationStreamConfig)
GetStream returns the stream if the it exists
func (ReplicationConfig) Normalize ¶ added in v1.1.14
func (rd ReplicationConfig) Normalize(n string) string
Normalize normalized the name
func (*ReplicationConfig) OriginalCfg ¶ added in v1.0.63
func (rd *ReplicationConfig) OriginalCfg() string
OriginalCfg returns original config
func (*ReplicationConfig) ParseReplicationHook ¶ added in v1.3.6
func (rd *ReplicationConfig) ParseReplicationHook(stage HookStage) (hooks Hooks, err error)
func (*ReplicationConfig) ParseStreamHook ¶ added in v1.3.5
func (rd *ReplicationConfig) ParseStreamHook(stage HookStage, rs *ReplicationStreamConfig) (hooks Hooks, err error)
func (*ReplicationConfig) ProcessChunks ¶ added in v1.4.1
func (rd *ReplicationConfig) ProcessChunks() (err error)
func (*ReplicationConfig) ProcessWildcards ¶ added in v0.87.18
func (rd *ReplicationConfig) ProcessWildcards() (err error)
ProcessWildcards process the streams using wildcards such as `my_schema.*` or `my_schema.my_prefix_*` or `my_schema.*_my_suffix`
func (*ReplicationConfig) ProcessWildcardsDatabase ¶ added in v1.0.68
func (rd *ReplicationConfig) ProcessWildcardsDatabase(c connection.Connection, patterns []string) (wildcards Wildcards, err error)
func (*ReplicationConfig) ProcessWildcardsFile ¶ added in v1.0.68
func (rd *ReplicationConfig) ProcessWildcardsFile(c connection.Connection, patterns []string) (wildcards Wildcards, err error)
func (*ReplicationConfig) RuntimeState ¶ added in v1.3.5
func (rd *ReplicationConfig) RuntimeState() (_ *ReplicationState, err error)
StateMap returns map for use
func (*ReplicationConfig) Scan ¶ added in v0.86.36
func (rd *ReplicationConfig) Scan(value interface{}) error
Scan scan value into Jsonb, implements sql.Scanner interface
func (ReplicationConfig) StreamsOrdered ¶ added in v1.0.6
func (rd ReplicationConfig) StreamsOrdered() []string
StreamsOrdered returns the stream names as ordered in the YAML file
type ReplicationState ¶ added in v1.4.1
type ReplicationState struct { State map[string]map[string]any `json:"state,omitempty"` Env map[string]any `json:"env,omitempty"` Timestamp DateTimeState `json:"timestamp,omitempty"` Execution ExecutionState `json:"execution,omitempty"` Source ConnState `json:"source,omitempty"` Target ConnState `json:"target,omitempty"` Stream *StreamState `json:"stream,omitempty"` Object *ObjectState `json:"object,omitempty"` Runs map[string]*RunState `json:"runs,omitempty"` Run *RunState `json:"run,omitempty"` }
ReplicationState is for runtime state
func (*ReplicationState) Marshall ¶ added in v1.4.1
func (rs *ReplicationState) Marshall() string
func (*ReplicationState) SetStateData ¶ added in v1.4.1
func (rs *ReplicationState) SetStateData(id string, data map[string]any)
func (*ReplicationState) SetStateKeyValue ¶ added in v1.4.1
func (rs *ReplicationState) SetStateKeyValue(id, key string, value any)
func (*ReplicationState) TaskExecution ¶ added in v1.4.1
func (rs *ReplicationState) TaskExecution() *TaskExecution
type ReplicationStreamConfig ¶ added in v0.86.36
type ReplicationStreamConfig struct { ID string `json:"id,omitempty" yaml:"id,omitempty"` Description string `json:"description,omitempty" yaml:"description,omitempty"` Mode Mode `json:"mode,omitempty" yaml:"mode,omitempty"` Object string `json:"object,omitempty" yaml:"object,omitempty"` Select []string `json:"select,omitempty" yaml:"select,flow,omitempty"` Where string `json:"where,omitempty" yaml:"where,omitempty"` PrimaryKeyI any `json:"primary_key,omitempty" yaml:"primary_key,flow,omitempty"` UpdateKey string `json:"update_key,omitempty" yaml:"update_key,omitempty"` SQL string `json:"sql,omitempty" yaml:"sql,omitempty"` Tags []string `json:"tags,omitempty" yaml:"tags,omitempty"` SourceOptions *SourceOptions `json:"source_options,omitempty" yaml:"source_options,omitempty"` TargetOptions *TargetOptions `json:"target_options,omitempty" yaml:"target_options,omitempty"` Schedule string `json:"schedule,omitempty" yaml:"schedule,omitempty"` Disabled bool `json:"disabled,omitempty" yaml:"disabled,omitempty"` Single *bool `json:"single,omitempty" yaml:"single,omitempty"` Transforms any `json:"transforms,omitempty" yaml:"transforms,omitempty"` Columns any `json:"columns,omitempty" yaml:"columns,omitempty"` Hooks HookMap `json:"hooks,omitempty" yaml:"hooks,omitempty"` // contains filtered or unexported fields }
func (*ReplicationStreamConfig) ObjectHasStreamVars ¶ added in v1.3.4
func (s *ReplicationStreamConfig) ObjectHasStreamVars() bool
func (*ReplicationStreamConfig) PrimaryKey ¶ added in v0.86.36
func (s *ReplicationStreamConfig) PrimaryKey() []string
type RouteName ¶
type RouteName string
RouteName is the name of a route
const ( RouteStatus RouteName = "/status" RouteNotice RouteName = "/notice" RouteError RouteName = "/error" RouteSignUpUser RouteName = "/sign-up" RouteUser RouteName = "/user" RouteForgotPassword RouteName = "/forgot-password" RouteResetPassword RouteName = "/reset-password" RouteLogin RouteName = "/login" RouteLogout RouteName = "/logout" RouteProxy RouteName = "/p" RouteAppIndex RouteName = "/app" RouteAppLogin RouteName = "/app/login" RouteAppLogout RouteName = "/app/logout" RouteAppAPIKey RouteName = "/app/apikey" RouteAPI RouteName = "/api/v1" RouteMasterStatus RouteName = "/api/v1/master-status" RouteMasterDBReset RouteName = "/api/v1/master-db-reset" RouteUploads RouteName = "/api/v1/uploads" RouteAPIAccounts RouteName = "/api/v1/accounts" RouteAPIProjects RouteName = "/api/v1/projects" RouteAPIKey RouteName = "/api/v1/apikey" RouteAPIUsers RouteName = "/api/v1/users" RouteAPIJobs RouteName = "/api/v1/jobs" RouteAPILogs RouteName = "/api/v1/logs" RouteAPIExecutions RouteName = "/api/v1/executions" RouteAPIConnections RouteName = "/api/v1/connections" RouteAPIConnectionTest RouteName = "/api/v1/connection-test" RouteAPIResetPassword RouteName = "/api/v1/reset-password" RouteAPIDataRequest RouteName = "/api/v1/data-request" RouteAPIWorkers RouteName = "/api/v1/workers" RouteAPISettings RouteName = "/api/v1/settings" RouteAlertLog RouteName = "/alert/log" RouteWs RouteName = "/ws" RouteWsClient RouteName = "/ws/client" RouteWsWorker RouteName = "/ws/worker" )
type RunState ¶ added in v1.3.5
type RunState struct { ID string `json:"id,omitempty"` Stream *StreamState `json:"stream,omitempty"` Object *ObjectState `json:"object,omitempty"` TotalBytes uint64 `json:"total_bytes,omitempty"` TotalRows uint64 `json:"total_rows,omitempty"` Status ExecStatus `json:"status,omitempty"` StartTime *time.Time `json:"start_time,omitempty"` EndTime *time.Time `json:"end_time,omitempty"` Duration int64 `json:"duration,omitempty"` Error *string `json:"error,omitempty"` Config ReplicationStreamConfig `json:"config,omitempty"` Task *TaskExecution `json:"-"` }
type RuntimeState ¶ added in v1.3.5
type Source ¶
type Source struct { Conn string `json:"conn,omitempty" yaml:"conn,omitempty"` Type dbio.Type `json:"type,omitempty" yaml:"type,omitempty"` Stream string `json:"stream,omitempty" yaml:"stream,omitempty"` Select []string `json:"select,omitempty" yaml:"select,omitempty"` // Select or exclude columns. Exclude with prefix "-". Where string `json:"where,omitempty" yaml:"where,omitempty"` Query string `json:"query,omitempty" yaml:"query,omitempty"` PrimaryKeyI any `json:"primary_key,omitempty" yaml:"primary_key,omitempty"` UpdateKey string `json:"update_key,omitempty" yaml:"update_key,omitempty"` Options *SourceOptions `json:"options,omitempty" yaml:"options,omitempty"` Data map[string]interface{} `json:"-" yaml:"-"` }
Source is a source of data
func (*Source) HasPrimaryKey ¶ added in v1.0.50
func (*Source) HasUpdateKey ¶ added in v1.0.50
func (*Source) PrimaryKey ¶ added in v0.84.0
type SourceOptions ¶
type SourceOptions struct { EmptyAsNull *bool `json:"empty_as_null,omitempty" yaml:"empty_as_null,omitempty"` Header *bool `json:"header,omitempty" yaml:"header,omitempty"` Flatten *bool `json:"flatten,omitempty" yaml:"flatten,omitempty"` FieldsPerRec *int `json:"fields_per_rec,omitempty" yaml:"fields_per_rec,omitempty"` Compression *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"` Format *dbio.FileType `json:"format,omitempty" yaml:"format,omitempty"` NullIf *string `json:"null_if,omitempty" yaml:"null_if,omitempty"` DatetimeFormat string `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"` SkipBlankLines *bool `json:"skip_blank_lines,omitempty" yaml:"skip_blank_lines,omitempty"` Delimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"` Escape string `json:"escape,omitempty" yaml:"escape,omitempty"` Quote string `json:"quote,omitempty" yaml:"quote,omitempty"` MaxDecimals *int `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"` JmesPath *string `json:"jmespath,omitempty" yaml:"jmespath,omitempty"` Sheet *string `json:"sheet,omitempty" yaml:"sheet,omitempty"` Range *string `json:"range,omitempty" yaml:"range,omitempty"` Limit *int `json:"limit,omitempty" yaml:"limit,omitempty"` Offset *int `json:"offset,omitempty" yaml:"offset,omitempty"` FileSelect *[]string `json:"file_select,omitempty" yaml:"file_select,omitempty"` // include/exclude files ChunkSize any `json:"chunk_size,omitempty" yaml:"chunk_size,omitempty"` // columns & transforms were moved out of source_options // https://github.com/slingdata-io/sling-cli/issues/348 Columns any `json:"columns,omitempty" yaml:"columns,omitempty"` // legacy Transforms any `json:"transforms,omitempty" yaml:"transforms,omitempty"` // legacy }
SourceOptions are connection and stream processing options
func (*SourceOptions) RangeStartEnd ¶ added in v1.4.1
func (so *SourceOptions) RangeStartEnd() (start, end string)
func (*SourceOptions) SetDefaults ¶ added in v1.0.31
func (o *SourceOptions) SetDefaults(sourceOptions SourceOptions)
type StreamState ¶ added in v1.3.5
type StreamState struct { FileFolder string `json:"file_folder,omitempty"` FileName string `json:"file_name,omitempty"` FileExt string `json:"file_ext,omitempty"` FilePath string `json:"file_path,omitempty"` Name string `json:"name,omitempty"` Schema string `json:"schema,omitempty"` SchemaLower string `json:"schema_lower,omitempty"` SchemaUpper string `json:"schema_upper,omitempty"` Table string `json:"table,omitempty"` TableLower string `json:"table_lower,omitempty"` TableUpper string `json:"table_upper,omitempty"` FullName string `json:"full_name,omitempty"` }
type Target ¶
type Target struct { Conn string `json:"conn,omitempty" yaml:"conn,omitempty"` Type dbio.Type `json:"type,omitempty" yaml:"type,omitempty"` Object string `json:"object,omitempty" yaml:"object,omitempty"` Columns any `json:"columns,omitempty" yaml:"columns,omitempty"` Options *TargetOptions `json:"options,omitempty" yaml:"options,omitempty"` Data map[string]interface{} `json:"-" yaml:"-"` TmpTableCreated bool `json:"-" yaml:"-"` // contains filtered or unexported fields }
Target is a target of data
func (*Target) ObjectFileFormat ¶ added in v1.2.25
type TargetOptions ¶
type TargetOptions struct { Header *bool `json:"header,omitempty" yaml:"header,omitempty"` Compression *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"` Concurrency int `json:"concurrency,omitempty" yaml:"concurrency,omitempty"` BatchLimit *int64 `json:"batch_limit,omitempty" yaml:"batch_limit,omitempty"` DatetimeFormat string `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"` Delimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"` FileMaxRows *int64 `json:"file_max_rows,omitempty" yaml:"file_max_rows,omitempty"` FileMaxBytes *int64 `json:"file_max_bytes,omitempty" yaml:"file_max_bytes,omitempty"` Format dbio.FileType `json:"format,omitempty" yaml:"format,omitempty"` MaxDecimals *int `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"` UseBulk *bool `json:"use_bulk,omitempty" yaml:"use_bulk,omitempty"` IgnoreExisting *bool `json:"ignore_existing,omitempty" yaml:"ignore_existing,omitempty"` DeleteMissing *string `json:"delete_missing,omitempty" yaml:"delete_missing,omitempty"` AddNewColumns *bool `json:"add_new_columns,omitempty" yaml:"add_new_columns,omitempty"` AdjustColumnType *bool `json:"adjust_column_type,omitempty" yaml:"adjust_column_type,omitempty"` ColumnCasing *iop.ColumnCasing `json:"column_casing,omitempty" yaml:"column_casing,omitempty"` TableKeys database.TableKeys `json:"table_keys,omitempty" yaml:"table_keys,omitempty"` TableTmp string `json:"table_tmp,omitempty" yaml:"table_tmp,omitempty"` TableDDL *string `json:"table_ddl,omitempty" yaml:"table_ddl,omitempty"` PreSQL *string `json:"pre_sql,omitempty" yaml:"pre_sql,omitempty"` PostSQL *string `json:"post_sql,omitempty" yaml:"post_sql,omitempty"` }
TargetOptions are target connection and stream processing options
func (*TargetOptions) SetDefaults ¶ added in v1.0.31
func (o *TargetOptions) SetDefaults(targetOptions TargetOptions)
type TaskExecution ¶
type TaskExecution struct { ExecID string `json:"exec_id"` Config *Config `json:"config"` Type JobType `json:"type"` Status ExecStatus `json:"status"` Err error `json:"error"` StartTime *time.Time `json:"start_time"` EndTime *time.Time `json:"end_time"` Bytes uint64 `json:"bytes"` Context *g.Context `json:"-"` Progress string `json:"progress"` Output strings.Builder `json:"-"` OutputLines chan *g.LogLine Replication *ReplicationConfig `json:"replication"` ProgressHist []string `json:"progress_hist"` PBar *ProgressBar `json:"-"` ProcStatsStart g.ProcStats `json:"-"` // process stats at beginning // contains filtered or unexported fields }
TaskExecution is a sling ELT task run, synonymous to an execution
func NewTask ¶
func NewTask(execID string, cfg *Config) (t *TaskExecution)
NewTask creates a Sling task with given configuration
func (*TaskExecution) AddCleanupTaskFirst ¶ added in v1.1.8
func (t *TaskExecution) AddCleanupTaskFirst(f func())
func (*TaskExecution) AddCleanupTaskLast ¶ added in v1.1.8
func (t *TaskExecution) AddCleanupTaskLast(f func())
func (*TaskExecution) AppendOutput ¶ added in v1.1.6
func (t *TaskExecution) AppendOutput(ll *g.LogLine)
func (*TaskExecution) Cleanup ¶ added in v0.84.3
func (t *TaskExecution) Cleanup()
func (*TaskExecution) Data ¶ added in v1.2.19
func (t *TaskExecution) Data() *iop.Dataset
Data return the dataset object
func (*TaskExecution) Df ¶ added in v1.2.7
func (t *TaskExecution) Df() *iop.Dataflow
Df return the dataflow object
func (*TaskExecution) Execute ¶
func (t *TaskExecution) Execute() error
Execute runs a Sling task. This may be a file/db to file/db transfer
func (*TaskExecution) ExecuteHooks ¶ added in v1.2.23
func (t *TaskExecution) ExecuteHooks(stage HookStage) (err error)
func (*TaskExecution) GetBytes ¶
func (t *TaskExecution) GetBytes() (inBytes, outBytes uint64)
GetBytes return the current total of bytes processed
func (*TaskExecution) GetBytesString ¶
func (t *TaskExecution) GetBytesString() (s string)
func (*TaskExecution) GetCount ¶
func (t *TaskExecution) GetCount() (count uint64)
GetCount return the current count of rows processed
func (*TaskExecution) GetRate ¶
func (t *TaskExecution) GetRate(secWindow int) (rowRate, byteRate int64)
GetRate return the speed of flow (rows / sec and bytes / sec) secWindow is how many seconds back to measure (0 is since beginning)
func (*TaskExecution) GetSourceTable ¶ added in v1.3.4
func (t *TaskExecution) GetSourceTable() (sTable database.Table, err error)
func (*TaskExecution) GetStateMap ¶ added in v1.2.23
func (t *TaskExecution) GetStateMap() map[string]any
func (*TaskExecution) GetTargetTable ¶ added in v1.3.4
func (t *TaskExecution) GetTargetTable(tempTableSuffix ...string) (tTable database.Table, err error)
func (*TaskExecution) GetTotalBytes ¶
func (t *TaskExecution) GetTotalBytes() (rcBytes, txBytes uint64)
GetTotalBytes gets the inbound/oubound bytes of the task
func (*TaskExecution) IsStalled ¶
func (t *TaskExecution) IsStalled(window float64) bool
IsStalled determines if the task has stalled (no row increment)
func (*TaskExecution) ReadFromDB ¶
func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df *iop.Dataflow, err error)
ReadFromDB reads from a source database
func (*TaskExecution) ReadFromFile ¶
func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error)
ReadFromFile reads from a source file
func (*TaskExecution) SetProgress ¶
func (t *TaskExecution) SetProgress(progressText string, args ...interface{})
SetProgress sets the progress
func (*TaskExecution) WriteToDb ¶
func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn database.Connection) (cnt uint64, err error)
WriteToDb writes to a target DB create temp table load into temp table insert / incremental / replace into target table
func (*TaskExecution) WriteToFile ¶
WriteToFile writes to a target file