sling

package
v1.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2025 License: GPL-3.0 Imports: 37 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AllExecStatus = []struct {
	Value  ExecStatus
	TSName string
}{
	{ExecStatusCreated, "ExecStatusCreated"},
	{ExecStatusQueued, "ExecStatusQueued"},
	{ExecStatusStarted, "ExecStatusStarted"},
	{ExecStatusRunning, "ExecStatusRunning"},
	{ExecStatusSuccess, "ExecStatusSuccess"},
	{ExecStatusTerminated, "ExecStatusTerminated"},
	{ExecStatusInterrupted, "ExecStatusInterrupted"},
	{ExecStatusTimedOut, "ExecStatusTimedOut"},
	{ExecStatusError, "ExecStatusError"},
	{ExecStatusSkipped, "ExecStatusSkipped"},
	{ExecStatusStalled, "ExecStatusStalled"},
}
View Source
var AllJobType = []struct {
	Value  JobType
	TSName string
}{
	{ConnTest, "ConnTest"},
	{ConnDiscover, "ConnDiscover"},
	{ConnExec, "ConnExec"},
	{DbToDb, "DbToDb"},
	{FileToDB, "FileToDB"},
	{DbToFile, "DbToFile"},
	{FileToFile, "FileToFile"},
	{DbSQL, "DbSQL"},
}
View Source
var AllMode = []struct {
	Value  Mode
	TSName string
}{
	{FullRefreshMode, "FullRefreshMode"},
	{IncrementalMode, "IncrementalMode"},
	{TruncateMode, "TruncateMode"},
	{SnapshotMode, "SnapshotMode"},
	{BackfillMode, "BackfillMode"},
}
View Source
var HookRunReplication func(string, *Config, ...string) error
View Source
var ParseHook = func(any, ParseOptions) (Hook, error) { return nil, nil }
View Source
var ShowProgress = true
View Source
var SourceDBOptionsDefault = SourceOptions{
	EmptyAsNull:    g.Bool(false),
	NullIf:         g.String("NULL"),
	DatetimeFormat: "AUTO",
	MaxDecimals:    g.Int(-1),
}
View Source
var SourceFileOptionsDefault = SourceOptions{
	EmptyAsNull:    g.Bool(true),
	Header:         g.Bool(true),
	Flatten:        g.Bool(false),
	Compression:    iop.CompressorTypePtr(iop.AutoCompressorType),
	NullIf:         g.String("NULL"),
	DatetimeFormat: "AUTO",
	SkipBlankLines: g.Bool(false),

	FieldsPerRec: g.Int(-1),
	MaxDecimals:  g.Int(-1),
}
View Source
var StoreSet = func(t *TaskExecution) error { return nil }

Set in the store/store.go file for history keeping

View Source
var TargetDBOptionsDefault = TargetOptions{
	FileMaxRows: lo.Ternary(
		os.Getenv("FILE_MAX_ROWS") != "",
		g.Int64(cast.ToInt64(os.Getenv("FILE_MAX_ROWS"))),
		g.Int64(0),
	),
	UseBulk:          g.Bool(true),
	AddNewColumns:    g.Bool(true),
	AdjustColumnType: g.Bool(false),
	DatetimeFormat:   "auto",
	MaxDecimals:      g.Int(-1),
	ColumnCasing:     g.Ptr(iop.SourceColumnCasing),
}
View Source
var TargetFileOptionsDefault = TargetOptions{
	Header: g.Bool(true),
	Compression: lo.Ternary(
		os.Getenv("COMPRESSION") != "",
		iop.CompressorTypePtr(iop.CompressorType(strings.ToLower(os.Getenv("COMPRESSION")))),
		iop.CompressorTypePtr(iop.AutoCompressorType),
	),
	Concurrency: lo.Ternary(
		os.Getenv("CONCURRENCY") != "",
		cast.ToInt(os.Getenv("CONCURRENCY")),
		7,
	),
	FileMaxRows: lo.Ternary(
		os.Getenv("FILE_MAX_ROWS") != "",
		g.Int64(cast.ToInt64(os.Getenv("FILE_MAX_ROWS"))),
		g.Int64(0),
	),
	FileMaxBytes: lo.Ternary(
		os.Getenv("FILE_MAX_BYTES") != "",
		g.Int64(cast.ToInt64(os.Getenv("FILE_MAX_BYTES"))),
		g.Int64(0),
	),
	Format:         dbio.FileTypeNone,
	UseBulk:        g.Bool(true),
	AddNewColumns:  g.Bool(true),
	DatetimeFormat: "auto",
	Delimiter:      ",",
	MaxDecimals:    g.Int(-1),
	ColumnCasing:   g.Ptr(iop.SourceColumnCasing),
}

Functions

func ClientDelete

func ClientDelete(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientDelete sends a DELETE request

func ClientGet

func ClientGet(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientGet sends a GET request

func ClientOptions

func ClientOptions(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientOptions sends a HEAD request

func ClientPatch

func ClientPatch(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientPatch sends a PATCH request

func ClientPost

func ClientPost(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientPost sends a POST request

func ClientPut

func ClientPut(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientPut sends a PUT request

func ErrorHelper added in v1.0.61

func ErrorHelper(err error) (helpString string)

func GetJWTFromKey

func GetJWTFromKey(masterServerURL, key string) (string, error)

GetJWTFromKey logs in and returns the JWT based on the provided key

func GetSQLText added in v1.1.14

func GetSQLText(sqlStringPath string) (string, error)

GetSQLText process source sql file / text

func IsJSONorYAML added in v1.2.14

func IsJSONorYAML(payload string) bool

IsJSONorYAML detects a JSON or YAML payload

func LoadProject

func LoadProject(path string)

func NewExecID added in v1.1.13

func NewExecID() string

func SetStreamDefaults added in v0.86.36

func SetStreamDefaults(name string, stream *ReplicationStreamConfig, replicationCfg ReplicationConfig)

func Sling

func Sling(cfg *Config) (err error)

Sling accepts a configuration and runs an Extract-Load task

func StateSet added in v1.3.5

func StateSet(t *TaskExecution)

Types

type Config

type Config struct {
	Source     Source            `json:"source,omitempty" yaml:"source,omitempty"`
	Target     Target            `json:"target" yaml:"target"`
	Mode       Mode              `json:"mode,omitempty" yaml:"mode,omitempty"`
	Transforms any               `json:"transforms,omitempty" yaml:"transforms,omitempty"`
	Options    ConfigOptions     `json:"options,omitempty" yaml:"options,omitempty"`
	Env        map[string]string `json:"env,omitempty" yaml:"env,omitempty"`

	StreamName        string                   `json:"stream_name,omitempty" yaml:"stream_name,omitempty"`
	ReplicationStream *ReplicationStreamConfig `json:"replication_stream,omitempty" yaml:"replication_stream,omitempty"`

	SrcConn  connection.Connection `json:"-" yaml:"-"`
	TgtConn  connection.Connection `json:"-" yaml:"-"`
	Prepared bool                  `json:"-" yaml:"-"`

	IncrementalVal    any    `json:"incremental_val" yaml:"incremental_val"`
	IncrementalValStr string `json:"incremental_val_str" yaml:"incremental_val_str"`
	IncrementalGTE    bool   `json:"incremental_gte,omitempty" yaml:"incremental_gte,omitempty"`

	MetadataLoadedAt  *bool `json:"-" yaml:"-"`
	MetadataStreamURL bool  `json:"-" yaml:"-"`
	MetadataRowNum    bool  `json:"-" yaml:"-"`
	MetadataRowID     bool  `json:"-" yaml:"-"`
	MetadataExecID    bool  `json:"-" yaml:"-"`
	// contains filtered or unexported fields
}

Config is the new config struct

func NewConfig

func NewConfig(cfgStr string) (cfg *Config, err error)

NewConfig return a config object from a YAML / JSON string

func (*Config) AsReplication added in v1.1.14

func (cfg *Config) AsReplication() (rc ReplicationConfig)

func (*Config) ColumnsPrepared added in v1.2.16

func (cfg *Config) ColumnsPrepared() (columns iop.Columns)

ColumnsPrepared returns the prepared columns

func (*Config) DetermineType added in v0.84.9

func (cfg *Config) DetermineType() (Type JobType, err error)

func (*Config) FormatTargetObjectName added in v0.85.54

func (cfg *Config) FormatTargetObjectName() (err error)

func (*Config) GetFormatMap added in v1.0.31

func (cfg *Config) GetFormatMap() (m map[string]any, err error)

GetFormatMap returns a map to format a string with provided with variables

func (*Config) HasIncrementalVal added in v1.2.21

func (cfg *Config) HasIncrementalVal() bool

HasIncrementalVal returns true there is a non-null incremental value

func (*Config) HasWildcard added in v1.1.14

func (cfg *Config) HasWildcard() bool

func (*Config) IgnoreExisting added in v1.2.10

func (cfg *Config) IgnoreExisting() bool

IgnoreExisting returns true target_options.ignore_existing is true

func (*Config) IsFileStreamWithStateAndParts added in v1.3.6

func (cfg *Config) IsFileStreamWithStateAndParts() bool

func (*Config) MD5 added in v1.1.6

func (cfg *Config) MD5() string

func (*Config) Prepare

func (cfg *Config) Prepare() (err error)

Prepare prepares the config

func (*Config) ReplicationMode added in v0.87.34

func (cfg *Config) ReplicationMode() bool

ReplicationMode returns true for replication mode

func (*Config) Scan

func (cfg *Config) Scan(value interface{}) error

Scan scan value into Jsonb, implements sql.Scanner interface

func (*Config) SetDefault

func (cfg *Config) SetDefault()

SetDefault sets default options

func (*Config) SrcConnMD5 added in v1.2.15

func (cfg *Config) SrcConnMD5() string

func (*Config) StreamID added in v1.2.15

func (cfg *Config) StreamID() string

func (*Config) TgtConnMD5 added in v1.2.15

func (cfg *Config) TgtConnMD5() string

func (*Config) TransformsPrepared added in v1.2.16

func (cfg *Config) TransformsPrepared() (colTransforms map[string][]string)

TransformsPrepared returns the transforms columns

func (*Config) Unmarshal

func (cfg *Config) Unmarshal(cfgStr string) (err error)

Unmarshal parse a configuration file path or config text

func (Config) Value

func (cfg Config) Value() (driver.Value, error)

Value return json value, implement driver.Valuer interface

type ConfigOptions

type ConfigOptions struct {
	Debug   bool `json:"debug,omitempty" yaml:"debug,omitempty"`
	StdIn   bool `json:"-"`                                          // whether stdin is passed
	StdOut  bool `json:"stdout,omitempty" yaml:"stdout,omitempty"`   // whether to output to stdout
	Dataset bool `json:"dataset,omitempty" yaml:"dataset,omitempty"` // whether to output to dataset
}

ConfigOptions are configuration options

type ConnState added in v1.3.5

type ConnState struct {
	Name      string    `json:"name,omitempty"`
	Type      dbio.Type `json:"type,omitempty"`
	Kind      dbio.Kind `json:"kind,omitempty"`
	Bucket    string    `json:"bucket,omitempty"`
	Container string    `json:"container,omitempty"`
	Database  string    `json:"database,omitempty"`
	Instance  string    `json:"instance,omitempty"`
	Schema    string    `json:"schema,omitempty"`
}

type DateTimeState added in v1.3.5

type DateTimeState struct {
	Timestamp time.Time `json:"timestamp,omitempty"`
	Unix      int64     `json:"unix,omitempty"`
	FileName  string    `json:"file_name,omitempty"`
	Rfc3339   string    `json:"rfc3339,omitempty"`
	Date      string    `json:"date,omitempty"`
	Datetime  string    `json:"datetime,omitempty"`
	YYYY      string    `json:"YYYY,omitempty"`
	YY        string    `json:"YY,omitempty"`
	MMM       string    `json:"MMM,omitempty"`
	MM        string    `json:"MM,omitempty"`
	DD        string    `json:"DD,omitempty"`
	DDD       string    `json:"DDD,omitempty"`
	HH        string    `json:"HH,omitempty"`
}

func (*DateTimeState) Update added in v1.3.5

func (dts *DateTimeState) Update()

type ExecStatus

type ExecStatus string

ExecStatus is the status of an execution

const (
	// ExecStatusCreated = created
	ExecStatusCreated ExecStatus = "created"
	// ExecStatusQueued = queued
	ExecStatusQueued ExecStatus = "queued"
	// ExecStatusStarted = started
	ExecStatusStarted ExecStatus = "started"
	// ExecStatusRunning = running
	ExecStatusRunning ExecStatus = "running"
	// ExecStatusSuccess = success
	ExecStatusSuccess ExecStatus = "success"
	// ExecStatusTerminated = terminated
	ExecStatusTerminated ExecStatus = "terminated"
	// ExecStatusInterrupted = interrupted
	ExecStatusInterrupted ExecStatus = "interrupted"
	// ExecStatusTimedOut = timed-out (when no heartbeat sent for 30 sec)
	ExecStatusTimedOut ExecStatus = "timed-out"
	// ExecStatusError = error
	ExecStatusError ExecStatus = "error"
	// ExecStatusSkipped = skipped
	ExecStatusSkipped ExecStatus = "skipped"
	// ExecStatusStalled = stalled (when still heartbeating, but rows are unchanged for a while)
	ExecStatusStalled ExecStatus = "stalled"
	// ExecStatusWarning = cancelled
	ExecStatusCancelled ExecStatus = "cancelled"
	// ExecStatusWarning = warning
	ExecStatusWarning ExecStatus = "warning"
)

func (ExecStatus) IsFailure

func (s ExecStatus) IsFailure() bool

IsFailure returns true if an execution is failed

func (ExecStatus) IsFinished

func (s ExecStatus) IsFinished() bool

IsFinished returns true if an execution is finished

func (ExecStatus) IsRunning

func (s ExecStatus) IsRunning() bool

IsRunning returns true if an execution is running

func (ExecStatus) IsSuccess

func (s ExecStatus) IsSuccess() bool

IsSuccess returns true if an execution is successful

func (ExecStatus) IsWarning added in v1.2.23

func (s ExecStatus) IsWarning() bool

IsWarning returns true if an execution is warning

type ExecutionState added in v1.3.6

type ExecutionState struct {
	ID         string     `json:"id,omitempty"`
	FilePath   string     `json:"string,omitempty"`
	TotalBytes uint64     `json:"total_bytes,omitempty"`
	TotalRows  uint64     `json:"total_rows,omitempty"`
	Status     StatusMap  `json:"status,omitempty"`
	StartTime  *time.Time `json:"start_time,omitempty"`
	EndTime    *time.Time `json:"end_time,omitempty"`
	Duration   int64      `json:"duration,omitempty"`
	Error      *string    `json:"error,omitempty"`
}

type ExecutionStatus

type ExecutionStatus struct {
	JobID       int        `json:"job_id,omitempty"`
	ExecID      int64      `json:"exec_id,omitempty"`
	Status      ExecStatus `json:"status,omitempty"`
	Text        string     `json:"text,omitempty"`
	Rows        uint64     `json:"rows,omitempty"`
	Bytes       uint64     `json:"bytes,omitempty"`
	Percent     int        `json:"percent,omitempty"`
	Stalled     bool       `json:"stalled,omitempty"`
	Duration    *int       `json:"duration,omitempty"`
	AvgDuration int        `json:"avg_duration,omitempty"`
}

ExecutionStatus is an execution status object

type Hook added in v1.2.23

type Hook interface {
	Type() HookType
	ID() string
	Data() map[string]any
	SetExtra(map[string]any)
	Stage() HookStage
	Execute() error
	ExecuteOnDone(error) (OnFailType, error)
}

type HookKind added in v1.4.1

type HookKind string
const (
	HookKindHook HookKind = "hook"
	HookKindStep HookKind = "step"
)

type HookMap added in v1.3.5

type HookMap struct {
	Start []any `json:"start,omitempty" yaml:"start,omitempty"`
	End   []any `json:"end,omitempty" yaml:"end,omitempty"`
	Pre   []any `json:"pre,omitempty" yaml:"pre,omitempty"`
	Post  []any `json:"post,omitempty" yaml:"post,omitempty"`
}

type HookStage added in v1.3.5

type HookStage string
const (
	HookStagePre   HookStage = "pre"
	HookStagePost  HookStage = "post"
	HookStageStart HookStage = "start"
	HookStageEnd   HookStage = "end"
)

type HookType added in v1.2.23

type HookType string

type Hooks added in v1.2.23

type Hooks []Hook

func (Hooks) Execute added in v1.3.5

func (hs Hooks) Execute() (err error)

type JobType

type JobType string

JobType is an enum type for jobs

const ConnDiscover JobType = "conn-discover"

ConnTest is for a connection discover

const ConnExec JobType = "conn-exec"

ConnTest is for a connection exec

const ConnTest JobType = "conn-test"

ConnTest is for a connection test

const DbSQL JobType = "db-sql"

DbSQL is for a db sql query

const DbToDb JobType = "db-db"

DbToDb is from db to db

const DbToFile JobType = "db-file"

DbToFile is from db to file

const FileToDB JobType = "file-db"

FileToDB is from db to db

const FileToFile JobType = "file-file"

FileToFile is from file to file

type Mode

type Mode string

Mode is a load mode

const (
	// TruncateMode is to truncate
	TruncateMode Mode = "truncate"
	// FullRefreshMode is to drop
	FullRefreshMode Mode = "full-refresh"
	// IncrementalMode is to incremental
	IncrementalMode Mode = "incremental"
	// SnapshotMode is to snapshot
	SnapshotMode Mode = "snapshot"
	// BackfillMode is to backfill
	BackfillMode Mode = "backfill"
)

type NotificationConfig

type NotificationConfig struct {
	Name        string   `json:"name"`
	Emails      []string `json:"emails"`
	Slack       bool     `json:"slack"`
	MsTeams     bool     `json:"msteams"`
	WebhookURLs []string `json:"webhook_urls"` // urls
	OnSuccess   bool     `json:"on_success"`
	OnFailure   bool     `json:"on_failure"`
	OnLinger    bool     `json:"on_linger"`
	OnEmpty     bool     `json:"on_empty"`
}

type ObjectState added in v1.3.5

type ObjectState struct {
	Schema   string `json:"schema,omitempty"`
	Table    string `json:"table,omitempty"`
	Name     string `json:"name,omitempty"`
	FullName string `json:"full_name,omitempty"`
}

type OnFailType added in v1.4.1

type OnFailType string

type ParseOptions added in v1.3.5

type ParseOptions struct {
	// contains filtered or unexported fields
}

type Pipeline added in v1.4.1

type Pipeline struct {
	Steps Hooks          `json:"steps" yaml:"steps"`
	Env   map[string]any `json:"env,omitempty" yaml:"env,omitempty"`
	// contains filtered or unexported fields
}

func LoadPipelineConfig added in v1.4.1

func LoadPipelineConfig(content string) (pipeline *Pipeline, err error)

func LoadPipelineConfigFromFile added in v1.4.1

func LoadPipelineConfigFromFile(cfgPath string) (pipeline *Pipeline, err error)

func (*Pipeline) Execute added in v1.4.1

func (pl *Pipeline) Execute() (err error)

func (*Pipeline) RuntimeState added in v1.4.1

func (pl *Pipeline) RuntimeState() (_ *PipelineState, err error)

RuntimeState returns the state for use

type PipelineState added in v1.4.1

type PipelineState struct {
	State     map[string]map[string]any `json:"state,omitempty"`
	Env       map[string]any            `json:"env,omitempty"`
	Timestamp DateTimeState             `json:"timestamp,omitempty"`
	Runs      map[string]*RunState      `json:"runs,omitempty"`
	Run       *RunState                 `json:"run,omitempty"`
}

func (*PipelineState) Marshall added in v1.4.1

func (ps *PipelineState) Marshall() string

func (*PipelineState) SetStateData added in v1.4.1

func (ps *PipelineState) SetStateData(id string, data map[string]any)

func (*PipelineState) SetStateKeyValue added in v1.4.1

func (ps *PipelineState) SetStateKeyValue(id, key string, value any)

func (*PipelineState) TaskExecution added in v1.4.1

func (ps *PipelineState) TaskExecution() *TaskExecution

type PipelineStep added in v1.4.1

type PipelineStep struct {
}

type PipelineSteps added in v1.4.1

type PipelineSteps []PipelineStep

type ProgressBar

type ProgressBar struct {
	// contains filtered or unexported fields
}

func NewPBar

func NewPBar(d time.Duration) *ProgressBar

NewPBar creates a new progress bar

func (*ProgressBar) Finish

func (pb *ProgressBar) Finish()

func (*ProgressBar) SetStatus

func (pb *ProgressBar) SetStatus(status string)

SetStatus sets the progress bar status

func (*ProgressBar) Start

func (pb *ProgressBar) Start()

type Project

type Project struct {
	Config      ProjectConfig
	TaskConfigs map[string]Config
}

type ProjectConfig

type ProjectConfig struct {
	Project          string                        `json:"project" yaml:"project"`
	TaskPaths        []string                      `json:"task-paths" yaml:"task-paths"`
	Defaults         map[string]interface{}        `json:"defaults" yaml:"defaults"`
	NotificationTags map[string]NotificationConfig `json:"notification_tags" yaml:"notification_tags"`
}

type ReplicationConfig added in v0.86.36

type ReplicationConfig struct {
	Source   string                              `json:"source,omitempty" yaml:"source,omitempty"`
	Target   string                              `json:"target,omitempty" yaml:"target,omitempty"`
	Hooks    HookMap                             `json:"hooks,omitempty" yaml:"hooks,omitempty"`
	Defaults ReplicationStreamConfig             `json:"defaults,omitempty" yaml:"defaults,omitempty"`
	Streams  map[string]*ReplicationStreamConfig `json:"streams,omitempty" yaml:"streams,omitempty"`
	Env      map[string]any                      `json:"env,omitempty" yaml:"env,omitempty"`

	// Tasks are compiled tasks
	Tasks    []*Config `json:"tasks"`
	Compiled bool      `json:"compiled"`
	FailErr  string    // error string to fail all (e.g. when the first tasks fails to connect)
	// contains filtered or unexported fields
}

func LoadReplicationConfig added in v0.87.17

func LoadReplicationConfig(content string) (config ReplicationConfig, err error)

func LoadReplicationConfigFromFile added in v1.2.10

func LoadReplicationConfigFromFile(cfgPath string) (config ReplicationConfig, err error)

func UnmarshalReplication added in v0.86.39

func UnmarshalReplication(replicYAML string) (config ReplicationConfig, err error)

UnmarshalReplication converts a yaml file to a replication

func (*ReplicationConfig) AddStream added in v1.1.15

func (rd *ReplicationConfig) AddStream(key string, cfg *ReplicationStreamConfig)

func (*ReplicationConfig) Compile added in v1.2.2

func (rd *ReplicationConfig) Compile(cfgOverwrite *Config, selectStreams ...string) (err error)

Compile compiles the replication into tasks

func (*ReplicationConfig) DeleteStream added in v1.1.15

func (rd *ReplicationConfig) DeleteStream(key string)

func (ReplicationConfig) GetStream added in v1.1.15

func (rd ReplicationConfig) GetStream(name string) (streamName string, cfg *ReplicationStreamConfig, found bool)

GetStream returns the stream if the it exists

func (*ReplicationConfig) JSON added in v1.2.14

func (rd *ReplicationConfig) JSON() string

JSON returns json payload

func (*ReplicationConfig) MD5 added in v1.1.6

func (rd *ReplicationConfig) MD5() string

MD5 returns a md5 hash of the json payload

func (ReplicationConfig) MatchStreams added in v1.1.15

func (rd ReplicationConfig) MatchStreams(pattern string) (streams map[string]*ReplicationStreamConfig)

GetStream returns the stream if the it exists

func (ReplicationConfig) Normalize added in v1.1.14

func (rd ReplicationConfig) Normalize(n string) string

Normalize normalized the name

func (*ReplicationConfig) OriginalCfg added in v1.0.63

func (rd *ReplicationConfig) OriginalCfg() string

OriginalCfg returns original config

func (*ReplicationConfig) ParseReplicationHook added in v1.3.6

func (rd *ReplicationConfig) ParseReplicationHook(stage HookStage) (hooks Hooks, err error)

func (*ReplicationConfig) ParseStreamHook added in v1.3.5

func (rd *ReplicationConfig) ParseStreamHook(stage HookStage, rs *ReplicationStreamConfig) (hooks Hooks, err error)

func (*ReplicationConfig) ProcessChunks added in v1.4.1

func (rd *ReplicationConfig) ProcessChunks() (err error)

func (*ReplicationConfig) ProcessWildcards added in v0.87.18

func (rd *ReplicationConfig) ProcessWildcards() (err error)

ProcessWildcards process the streams using wildcards such as `my_schema.*` or `my_schema.my_prefix_*` or `my_schema.*_my_suffix`

func (*ReplicationConfig) ProcessWildcardsDatabase added in v1.0.68

func (rd *ReplicationConfig) ProcessWildcardsDatabase(c connection.Connection, patterns []string) (wildcards Wildcards, err error)

func (*ReplicationConfig) ProcessWildcardsFile added in v1.0.68

func (rd *ReplicationConfig) ProcessWildcardsFile(c connection.Connection, patterns []string) (wildcards Wildcards, err error)

func (*ReplicationConfig) RuntimeState added in v1.3.5

func (rd *ReplicationConfig) RuntimeState() (_ *ReplicationState, err error)

StateMap returns map for use

func (*ReplicationConfig) Scan added in v0.86.36

func (rd *ReplicationConfig) Scan(value interface{}) error

Scan scan value into Jsonb, implements sql.Scanner interface

func (ReplicationConfig) StreamsOrdered added in v1.0.6

func (rd ReplicationConfig) StreamsOrdered() []string

StreamsOrdered returns the stream names as ordered in the YAML file

func (ReplicationConfig) Value added in v0.86.36

func (rd ReplicationConfig) Value() (driver.Value, error)

Value return json value, implement driver.Valuer interface

type ReplicationState added in v1.4.1

type ReplicationState struct {
	State     map[string]map[string]any `json:"state,omitempty"`
	Env       map[string]any            `json:"env,omitempty"`
	Timestamp DateTimeState             `json:"timestamp,omitempty"`
	Execution ExecutionState            `json:"execution,omitempty"`
	Source    ConnState                 `json:"source,omitempty"`
	Target    ConnState                 `json:"target,omitempty"`
	Stream    *StreamState              `json:"stream,omitempty"`
	Object    *ObjectState              `json:"object,omitempty"`
	Runs      map[string]*RunState      `json:"runs,omitempty"`
	Run       *RunState                 `json:"run,omitempty"`
}

ReplicationState is for runtime state

func (*ReplicationState) Marshall added in v1.4.1

func (rs *ReplicationState) Marshall() string

func (*ReplicationState) SetStateData added in v1.4.1

func (rs *ReplicationState) SetStateData(id string, data map[string]any)

func (*ReplicationState) SetStateKeyValue added in v1.4.1

func (rs *ReplicationState) SetStateKeyValue(id, key string, value any)

func (*ReplicationState) TaskExecution added in v1.4.1

func (rs *ReplicationState) TaskExecution() *TaskExecution

type ReplicationStreamConfig added in v0.86.36

type ReplicationStreamConfig struct {
	ID            string         `json:"id,omitempty" yaml:"id,omitempty"`
	Description   string         `json:"description,omitempty" yaml:"description,omitempty"`
	Mode          Mode           `json:"mode,omitempty" yaml:"mode,omitempty"`
	Object        string         `json:"object,omitempty" yaml:"object,omitempty"`
	Select        []string       `json:"select,omitempty" yaml:"select,flow,omitempty"`
	Where         string         `json:"where,omitempty" yaml:"where,omitempty"`
	PrimaryKeyI   any            `json:"primary_key,omitempty" yaml:"primary_key,flow,omitempty"`
	UpdateKey     string         `json:"update_key,omitempty" yaml:"update_key,omitempty"`
	SQL           string         `json:"sql,omitempty" yaml:"sql,omitempty"`
	Tags          []string       `json:"tags,omitempty" yaml:"tags,omitempty"`
	SourceOptions *SourceOptions `json:"source_options,omitempty" yaml:"source_options,omitempty"`
	TargetOptions *TargetOptions `json:"target_options,omitempty" yaml:"target_options,omitempty"`
	Schedule      string         `json:"schedule,omitempty" yaml:"schedule,omitempty"`
	Disabled      bool           `json:"disabled,omitempty" yaml:"disabled,omitempty"`
	Single        *bool          `json:"single,omitempty" yaml:"single,omitempty"`
	Transforms    any            `json:"transforms,omitempty" yaml:"transforms,omitempty"`
	Columns       any            `json:"columns,omitempty" yaml:"columns,omitempty"`
	Hooks         HookMap        `json:"hooks,omitempty" yaml:"hooks,omitempty"`
	// contains filtered or unexported fields
}

func (*ReplicationStreamConfig) ObjectHasStreamVars added in v1.3.4

func (s *ReplicationStreamConfig) ObjectHasStreamVars() bool

func (*ReplicationStreamConfig) PrimaryKey added in v0.86.36

func (s *ReplicationStreamConfig) PrimaryKey() []string

type RouteName

type RouteName string

RouteName is the name of a route

const (
	RouteStatus         RouteName = "/status"
	RouteNotice         RouteName = "/notice"
	RouteError          RouteName = "/error"
	RouteSignUpUser     RouteName = "/sign-up"
	RouteUser           RouteName = "/user"
	RouteForgotPassword RouteName = "/forgot-password"
	RouteResetPassword  RouteName = "/reset-password"
	RouteLogin          RouteName = "/login"
	RouteLogout         RouteName = "/logout"
	RouteProxy          RouteName = "/p"

	RouteAppIndex  RouteName = "/app"
	RouteAppLogin  RouteName = "/app/login"
	RouteAppLogout RouteName = "/app/logout"
	RouteAppAPIKey RouteName = "/app/apikey"

	RouteAPI               RouteName = "/api/v1"
	RouteMasterStatus      RouteName = "/api/v1/master-status"
	RouteMasterDBReset     RouteName = "/api/v1/master-db-reset"
	RouteUploads           RouteName = "/api/v1/uploads"
	RouteAPIAccounts       RouteName = "/api/v1/accounts"
	RouteAPIProjects       RouteName = "/api/v1/projects"
	RouteAPIKey            RouteName = "/api/v1/apikey"
	RouteAPIUsers          RouteName = "/api/v1/users"
	RouteAPIJobs           RouteName = "/api/v1/jobs"
	RouteAPILogs           RouteName = "/api/v1/logs"
	RouteAPIExecutions     RouteName = "/api/v1/executions"
	RouteAPIConnections    RouteName = "/api/v1/connections"
	RouteAPIConnectionTest RouteName = "/api/v1/connection-test"
	RouteAPIResetPassword  RouteName = "/api/v1/reset-password"
	RouteAPIDataRequest    RouteName = "/api/v1/data-request"
	RouteAPIWorkers        RouteName = "/api/v1/workers"
	RouteAPISettings       RouteName = "/api/v1/settings"
	RouteAlertLog          RouteName = "/alert/log"

	RouteWs       RouteName = "/ws"
	RouteWsClient RouteName = "/ws/client"
	RouteWsWorker RouteName = "/ws/worker"
)

type RunState added in v1.3.5

type RunState struct {
	ID         string                  `json:"id,omitempty"`
	Stream     *StreamState            `json:"stream,omitempty"`
	Object     *ObjectState            `json:"object,omitempty"`
	TotalBytes uint64                  `json:"total_bytes,omitempty"`
	TotalRows  uint64                  `json:"total_rows,omitempty"`
	Status     ExecStatus              `json:"status,omitempty"`
	StartTime  *time.Time              `json:"start_time,omitempty"`
	EndTime    *time.Time              `json:"end_time,omitempty"`
	Duration   int64                   `json:"duration,omitempty"`
	Error      *string                 `json:"error,omitempty"`
	Config     ReplicationStreamConfig `json:"config,omitempty"`
	Task       *TaskExecution          `json:"-"`
}

type RuntimeState added in v1.3.5

type RuntimeState interface {
	SetStateData(id string, data map[string]any)
	SetStateKeyValue(id, key string, value any)
	Marshall() string
	TaskExecution() *TaskExecution
}

type Source

type Source struct {
	Conn        string         `json:"conn,omitempty" yaml:"conn,omitempty"`
	Type        dbio.Type      `json:"type,omitempty" yaml:"type,omitempty"`
	Stream      string         `json:"stream,omitempty" yaml:"stream,omitempty"`
	Select      []string       `json:"select,omitempty" yaml:"select,omitempty"` // Select or exclude columns. Exclude with prefix "-".
	Where       string         `json:"where,omitempty" yaml:"where,omitempty"`
	Query       string         `json:"query,omitempty" yaml:"query,omitempty"`
	PrimaryKeyI any            `json:"primary_key,omitempty" yaml:"primary_key,omitempty"`
	UpdateKey   string         `json:"update_key,omitempty" yaml:"update_key,omitempty"`
	Options     *SourceOptions `json:"options,omitempty" yaml:"options,omitempty"`

	Data map[string]interface{} `json:"-" yaml:"-"`
}

Source is a source of data

func (*Source) HasPrimaryKey added in v1.0.50

func (s *Source) HasPrimaryKey() bool

func (*Source) HasUpdateKey added in v1.0.50

func (s *Source) HasUpdateKey() bool

func (*Source) Limit

func (s *Source) Limit() int

func (*Source) MD5 added in v1.1.6

func (s *Source) MD5() string

func (*Source) Offset added in v1.2.14

func (s *Source) Offset() int

func (*Source) PrimaryKey added in v0.84.0

func (s *Source) PrimaryKey() []string

type SourceOptions

type SourceOptions struct {
	EmptyAsNull    *bool               `json:"empty_as_null,omitempty" yaml:"empty_as_null,omitempty"`
	Header         *bool               `json:"header,omitempty" yaml:"header,omitempty"`
	Flatten        *bool               `json:"flatten,omitempty" yaml:"flatten,omitempty"`
	FieldsPerRec   *int                `json:"fields_per_rec,omitempty" yaml:"fields_per_rec,omitempty"`
	Compression    *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`
	Format         *dbio.FileType      `json:"format,omitempty" yaml:"format,omitempty"`
	NullIf         *string             `json:"null_if,omitempty" yaml:"null_if,omitempty"`
	DatetimeFormat string              `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`
	SkipBlankLines *bool               `json:"skip_blank_lines,omitempty" yaml:"skip_blank_lines,omitempty"`
	Delimiter      string              `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
	Escape         string              `json:"escape,omitempty" yaml:"escape,omitempty"`
	Quote          string              `json:"quote,omitempty" yaml:"quote,omitempty"`
	MaxDecimals    *int                `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`
	JmesPath       *string             `json:"jmespath,omitempty" yaml:"jmespath,omitempty"`
	Sheet          *string             `json:"sheet,omitempty" yaml:"sheet,omitempty"`
	Range          *string             `json:"range,omitempty" yaml:"range,omitempty"`
	Limit          *int                `json:"limit,omitempty" yaml:"limit,omitempty"`
	Offset         *int                `json:"offset,omitempty" yaml:"offset,omitempty"`
	FileSelect     *[]string           `json:"file_select,omitempty" yaml:"file_select,omitempty"` // include/exclude files
	ChunkSize      any                 `json:"chunk_size,omitempty" yaml:"chunk_size,omitempty"`

	// columns & transforms were moved out of source_options
	// https://github.com/slingdata-io/sling-cli/issues/348
	Columns    any `json:"columns,omitempty" yaml:"columns,omitempty"`       // legacy
	Transforms any `json:"transforms,omitempty" yaml:"transforms,omitempty"` // legacy
}

SourceOptions are connection and stream processing options

func (*SourceOptions) RangeStartEnd added in v1.4.1

func (so *SourceOptions) RangeStartEnd() (start, end string)

func (*SourceOptions) SetDefaults added in v1.0.31

func (o *SourceOptions) SetDefaults(sourceOptions SourceOptions)

type StatusMap added in v1.3.6

type StatusMap struct {
	Count     int `json:"count"`
	Success   int `json:"success"`
	Running   int `json:"running"`
	Skipped   int `json:"skipped"`
	Cancelled int `json:"cancelled"`
	Warning   int `json:"warning"`
	Error     int `json:"error"`
}

type StreamState added in v1.3.5

type StreamState struct {
	FileFolder  string `json:"file_folder,omitempty"`
	FileName    string `json:"file_name,omitempty"`
	FileExt     string `json:"file_ext,omitempty"`
	FilePath    string `json:"file_path,omitempty"`
	Name        string `json:"name,omitempty"`
	Schema      string `json:"schema,omitempty"`
	SchemaLower string `json:"schema_lower,omitempty"`
	SchemaUpper string `json:"schema_upper,omitempty"`
	Table       string `json:"table,omitempty"`
	TableLower  string `json:"table_lower,omitempty"`
	TableUpper  string `json:"table_upper,omitempty"`
	FullName    string `json:"full_name,omitempty"`
}

type Target

type Target struct {
	Conn    string         `json:"conn,omitempty" yaml:"conn,omitempty"`
	Type    dbio.Type      `json:"type,omitempty" yaml:"type,omitempty"`
	Object  string         `json:"object,omitempty" yaml:"object,omitempty"`
	Columns any            `json:"columns,omitempty" yaml:"columns,omitempty"`
	Options *TargetOptions `json:"options,omitempty" yaml:"options,omitempty"`

	Data map[string]interface{} `json:"-" yaml:"-"`

	TmpTableCreated bool `json:"-" yaml:"-"`
	// contains filtered or unexported fields
}

Target is a target of data

func (*Target) MD5 added in v1.1.6

func (t *Target) MD5() string

func (*Target) ObjectFileFormat added in v1.2.25

func (t *Target) ObjectFileFormat() dbio.FileType

type TargetOptions

type TargetOptions struct {
	Header           *bool               `json:"header,omitempty" yaml:"header,omitempty"`
	Compression      *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`
	Concurrency      int                 `json:"concurrency,omitempty" yaml:"concurrency,omitempty"`
	BatchLimit       *int64              `json:"batch_limit,omitempty" yaml:"batch_limit,omitempty"`
	DatetimeFormat   string              `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`
	Delimiter        string              `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
	FileMaxRows      *int64              `json:"file_max_rows,omitempty" yaml:"file_max_rows,omitempty"`
	FileMaxBytes     *int64              `json:"file_max_bytes,omitempty" yaml:"file_max_bytes,omitempty"`
	Format           dbio.FileType       `json:"format,omitempty" yaml:"format,omitempty"`
	MaxDecimals      *int                `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`
	UseBulk          *bool               `json:"use_bulk,omitempty" yaml:"use_bulk,omitempty"`
	IgnoreExisting   *bool               `json:"ignore_existing,omitempty" yaml:"ignore_existing,omitempty"`
	DeleteMissing    *string             `json:"delete_missing,omitempty" yaml:"delete_missing,omitempty"`
	AddNewColumns    *bool               `json:"add_new_columns,omitempty" yaml:"add_new_columns,omitempty"`
	AdjustColumnType *bool               `json:"adjust_column_type,omitempty" yaml:"adjust_column_type,omitempty"`
	ColumnCasing     *iop.ColumnCasing   `json:"column_casing,omitempty" yaml:"column_casing,omitempty"`

	TableKeys database.TableKeys `json:"table_keys,omitempty" yaml:"table_keys,omitempty"`
	TableTmp  string             `json:"table_tmp,omitempty" yaml:"table_tmp,omitempty"`
	TableDDL  *string            `json:"table_ddl,omitempty" yaml:"table_ddl,omitempty"`
	PreSQL    *string            `json:"pre_sql,omitempty" yaml:"pre_sql,omitempty"`
	PostSQL   *string            `json:"post_sql,omitempty" yaml:"post_sql,omitempty"`
}

TargetOptions are target connection and stream processing options

func (*TargetOptions) SetDefaults added in v1.0.31

func (o *TargetOptions) SetDefaults(targetOptions TargetOptions)

type TaskExecution

type TaskExecution struct {
	ExecID    string     `json:"exec_id"`
	Config    *Config    `json:"config"`
	Type      JobType    `json:"type"`
	Status    ExecStatus `json:"status"`
	Err       error      `json:"error"`
	StartTime *time.Time `json:"start_time"`
	EndTime   *time.Time `json:"end_time"`
	Bytes     uint64     `json:"bytes"`
	Context   *g.Context `json:"-"`
	Progress  string     `json:"progress"`

	Output      strings.Builder `json:"-"`
	OutputLines chan *g.LogLine

	Replication    *ReplicationConfig `json:"replication"`
	ProgressHist   []string           `json:"progress_hist"`
	PBar           *ProgressBar       `json:"-"`
	ProcStatsStart g.ProcStats        `json:"-"` // process stats at beginning
	// contains filtered or unexported fields
}

TaskExecution is a sling ELT task run, synonymous to an execution

func NewTask

func NewTask(execID string, cfg *Config) (t *TaskExecution)

NewTask creates a Sling task with given configuration

func (*TaskExecution) AddCleanupTaskFirst added in v1.1.8

func (t *TaskExecution) AddCleanupTaskFirst(f func())

func (*TaskExecution) AddCleanupTaskLast added in v1.1.8

func (t *TaskExecution) AddCleanupTaskLast(f func())

func (*TaskExecution) AppendOutput added in v1.1.6

func (t *TaskExecution) AppendOutput(ll *g.LogLine)

func (*TaskExecution) Cleanup added in v0.84.3

func (t *TaskExecution) Cleanup()

func (*TaskExecution) Data added in v1.2.19

func (t *TaskExecution) Data() *iop.Dataset

Data return the dataset object

func (*TaskExecution) Df added in v1.2.7

func (t *TaskExecution) Df() *iop.Dataflow

Df return the dataflow object

func (*TaskExecution) Execute

func (t *TaskExecution) Execute() error

Execute runs a Sling task. This may be a file/db to file/db transfer

func (*TaskExecution) ExecuteHooks added in v1.2.23

func (t *TaskExecution) ExecuteHooks(stage HookStage) (err error)

func (*TaskExecution) GetBytes

func (t *TaskExecution) GetBytes() (inBytes, outBytes uint64)

GetBytes return the current total of bytes processed

func (*TaskExecution) GetBytesString

func (t *TaskExecution) GetBytesString() (s string)

func (*TaskExecution) GetCount

func (t *TaskExecution) GetCount() (count uint64)

GetCount return the current count of rows processed

func (*TaskExecution) GetRate

func (t *TaskExecution) GetRate(secWindow int) (rowRate, byteRate int64)

GetRate return the speed of flow (rows / sec and bytes / sec) secWindow is how many seconds back to measure (0 is since beginning)

func (*TaskExecution) GetSourceTable added in v1.3.4

func (t *TaskExecution) GetSourceTable() (sTable database.Table, err error)

func (*TaskExecution) GetStateMap added in v1.2.23

func (t *TaskExecution) GetStateMap() map[string]any

func (*TaskExecution) GetTargetTable added in v1.3.4

func (t *TaskExecution) GetTargetTable(tempTableSuffix ...string) (tTable database.Table, err error)

func (*TaskExecution) GetTotalBytes

func (t *TaskExecution) GetTotalBytes() (rcBytes, txBytes uint64)

GetTotalBytes gets the inbound/oubound bytes of the task

func (*TaskExecution) IsStalled

func (t *TaskExecution) IsStalled(window float64) bool

IsStalled determines if the task has stalled (no row increment)

func (*TaskExecution) ReadFromDB

func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df *iop.Dataflow, err error)

ReadFromDB reads from a source database

func (*TaskExecution) ReadFromFile

func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error)

ReadFromFile reads from a source file

func (*TaskExecution) SetProgress

func (t *TaskExecution) SetProgress(progressText string, args ...interface{})

SetProgress sets the progress

func (*TaskExecution) WriteToDb

func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn database.Connection) (cnt uint64, err error)

WriteToDb writes to a target DB create temp table load into temp table insert / incremental / replace into target table

func (*TaskExecution) WriteToFile

func (t *TaskExecution) WriteToFile(cfg *Config, df *iop.Dataflow) (cnt uint64, err error)

WriteToFile writes to a target file

type Wildcard added in v1.2.19

type Wildcard struct {
	Pattern     string
	StreamNames []string
	NodeMap     map[string]filesys.FileNode
	TableMap    map[string]database.Table
}

type Wildcards added in v1.2.19

type Wildcards []*Wildcard

func (Wildcards) Patterns added in v1.2.19

func (ws Wildcards) Patterns() []string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL