sling

package
v0.87.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2022 License: GPL-3.0 Imports: 33 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AllowedProps = map[string]string{
	"sheet": "Provided for Excel source files. Default is first sheet",
	"range": "Optional for Excel source file. Default is largest table range",
}

AllowedProps allowed properties

View Source
var MetadataLoadedAt = false
View Source
var MetadataStreamURL = false
View Source
var ShowProgress = true
View Source
var SourceAPIOptionsDefault = SourceOptions{
	EmptyAsNull:    g.Bool(true),
	NullIf:         g.String("NULL"),
	DatetimeFormat: "AUTO",
	MaxDecimals:    g.Int(9),
}
View Source
var SourceDBOptionsDefault = SourceOptions{
	EmptyAsNull:    g.Bool(true),
	NullIf:         g.String("NULL"),
	DatetimeFormat: "AUTO",
	MaxDecimals:    g.Int(9),
}
View Source
var SourceFileOptionsDefault = SourceOptions{
	TrimSpace:      g.Bool(false),
	EmptyAsNull:    g.Bool(true),
	Header:         g.Bool(true),
	Flatten:        g.Bool(false),
	Compression:    iop.CompressorTypePtr(iop.AutoCompressorType),
	NullIf:         g.String("NULL"),
	DatetimeFormat: "AUTO",
	SkipBlankLines: g.Bool(false),
	Delimiter:      ",",
	MaxDecimals:    g.Int(9),
}
View Source
var TargetAPIOptionsDefault = TargetOptions{
	FileMaxRows: lo.Ternary(
		os.Getenv("FILE_MAX_ROWS") != "",
		cast.ToInt64(os.Getenv("FILE_MAX_ROWS")),
		0,
	),
	UseBulk:        g.Bool(true),
	DatetimeFormat: "auto",
	MaxDecimals:    g.Int(-1),
}
View Source
var TargetDBOptionsDefault = TargetOptions{
	FileMaxRows: lo.Ternary(
		os.Getenv("FILE_MAX_ROWS") != "",
		cast.ToInt64(os.Getenv("FILE_MAX_ROWS")),
		0,
	),
	UseBulk:        g.Bool(true),
	AddNewColumns:  true,
	DatetimeFormat: "auto",
	MaxDecimals:    g.Int(-1),
}
View Source
var TargetFileOptionsDefault = TargetOptions{
	Header: g.Bool(true),
	Compression: lo.Ternary(
		os.Getenv("COMPRESSION") != "",
		iop.CompressorTypePtr(iop.CompressorType(os.Getenv("COMPRESSION"))),
		iop.CompressorTypePtr(iop.AutoCompressorType),
	),
	Concurrency: lo.Ternary(
		os.Getenv("CONCURRENCY") != "",
		cast.ToInt(os.Getenv("CONCURRENCY")),
		runtime.NumCPU(),
	),
	FileMaxRows: lo.Ternary(
		os.Getenv("FILE_MAX_ROWS") != "",
		cast.ToInt64(os.Getenv("FILE_MAX_ROWS")),
		0,
	),
	UseBulk:        g.Bool(true),
	DatetimeFormat: "auto",
	Delimiter:      ",",
	MaxDecimals:    g.Int(-1),
}

Functions

func ClientDelete

func ClientDelete(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientDelete sends a DELETE request

func ClientGet

func ClientGet(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientGet sends a GET request

func ClientOptions

func ClientOptions(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientOptions sends a HEAD request

func ClientPatch

func ClientPatch(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientPatch sends a PATCH request

func ClientPost

func ClientPost(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientPost sends a POST request

func ClientPut

func ClientPut(serverURL string, route RouteName, m map[string]interface{}, headers map[string]string) (respStr string, err error)

ClientPut sends a PUT request

func GetJWTFromKey

func GetJWTFromKey(masterServerURL, key string) (string, error)

GetJWTFromKey logs in and returns the JWT based on the provided key

func LoadProject

func LoadProject(path string)

func SetStreamDefaults added in v0.86.36

func SetStreamDefaults(stream *ReplicationStreamConfig, replicationCfg ReplicationConfig)

func Sling

func Sling(cfg *Config) (err error)

Sling accepts a configuration and runs an Extract-Load task

Types

type Config

type Config struct {
	Source  Source            `json:"source,omitempty" yaml:"source,omitempty"`
	Target  Target            `json:"target" yaml:"target"`
	Mode    Mode              `json:"mode,omitempty" yaml:"mode,omitempty"`
	Options ConfigOptions     `json:"options,omitempty" yaml:"options,omitempty"`
	Env     map[string]string `json:"env,omitempty" yaml:"env,omitempty"`

	SrcConn        connection.Connection `json:"_src_conn,omitempty" yaml:"_src_conn,omitempty"`
	TgtConn        connection.Connection `json:"_tgt_conn,omitempty" yaml:"_tgt_conn,omitempty"`
	Prepared       bool                  `json:"_prepared,omitempty" yaml:"_prepared,omitempty"`
	IncrementalVal string                `json:"-" yaml:"-"`
}

Config is the new config struct

func NewConfig

func NewConfig(cfgStr string) (cfg *Config, err error)

NewConfig return a config object from a YAML / JSON string

func (*Config) DetermineType added in v0.84.9

func (cfg *Config) DetermineType() (Type JobType, err error)

func (*Config) FormatTargetObjectName added in v0.85.54

func (cfg *Config) FormatTargetObjectName() (err error)

func (*Config) Marshal

func (cfg *Config) Marshal() (cfgBytes []byte, err error)

Marshal marshals into JSON

func (*Config) Prepare

func (cfg *Config) Prepare() (err error)

Prepare prepares the config

func (*Config) Scan

func (cfg *Config) Scan(value interface{}) error

Scan scan value into Jsonb, implements sql.Scanner interface

func (*Config) SetDefault

func (cfg *Config) SetDefault()

SetDefault sets default options

func (*Config) Unmarshal

func (cfg *Config) Unmarshal(cfgStr string) error

Unmarshal parse a configuration file path or config text

func (Config) Value

func (cfg Config) Value() (driver.Value, error)

Value return json value, implement driver.Valuer interface

type ConfigOptions

type ConfigOptions struct {
	Debug  bool `json:"debug,omitempty" yaml:"debug,omitempty"`
	StdIn  bool `json:"-"`                                        // whether stdin is passed
	StdOut bool `json:"stdout,omitempty" yaml:"stdout,omitempty"` // whether to output to stdout
}

ConfigOptions are configuration options

type ExecStatus

type ExecStatus string

ExecStatus is the status of an execution

const (
	// ExecStatusCreated = created
	ExecStatusCreated ExecStatus = "created"
	// ExecStatusQueued = queued
	ExecStatusQueued ExecStatus = "queued"
	// ExecStatusStarted = started
	ExecStatusStarted ExecStatus = "started"
	// ExecStatusRunning = running
	ExecStatusRunning ExecStatus = "running"
	// ExecStatusSuccess = success
	ExecStatusSuccess ExecStatus = "success"
	// ExecStatusTerminated = terminated
	ExecStatusTerminated ExecStatus = "terminated"
	// ExecStatusInterrupted = interrupted
	ExecStatusInterrupted ExecStatus = "interrupted"
	// ExecStatusTimedOut = timed-out (when no heartbeat sent for 30 sec)
	ExecStatusTimedOut ExecStatus = "timed-out"
	// ExecStatusError = error
	ExecStatusError ExecStatus = "error"
	// ExecStatusSkipped = skipped
	ExecStatusSkipped ExecStatus = "skipped"
	// ExecStatusStalled = stalled (when still heartbeating, but rows are unchanged for a while)
	ExecStatusStalled ExecStatus = "stalled"
)

func (ExecStatus) IsFailure

func (s ExecStatus) IsFailure() bool

IsFailure returns true if an execution is failed

func (ExecStatus) IsFinished

func (s ExecStatus) IsFinished() bool

IsFinished returns true if an execution is finished

func (ExecStatus) IsRunning

func (s ExecStatus) IsRunning() bool

IsRunning returns true if an execution is running

func (ExecStatus) IsSuccess

func (s ExecStatus) IsSuccess() bool

IsSuccess returns true if an execution is successful

type ExecutionStatus

type ExecutionStatus struct {
	JobID       int        `json:"job_id,omitempty"`
	ExecID      int64      `json:"exec_id,omitempty"`
	Status      ExecStatus `json:"status,omitempty"`
	Text        string     `json:"text,omitempty"`
	Rows        uint64     `json:"rows,omitempty"`
	Bytes       uint64     `json:"bytes,omitempty"`
	Percent     int        `json:"percent,omitempty"`
	Stalled     bool       `json:"stalled,omitempty"`
	Duration    *int       `json:"duration,omitempty"`
	AvgDuration int        `json:"avg_duration,omitempty"`
}

ExecutionStatus is an execution status object

type JobType

type JobType string

JobType is an enum type for jobs

const APIToDb JobType = "api-db"

APIToDb is from api to db

const APIToFile JobType = "api-file"

APIToFile is from api to file

const ConnTest JobType = "conn-test"

ConnTest is for a connection test

const DbSQL JobType = "db-sql"

DbSQL is for a db sql query

const DbToDb JobType = "db-db"

DbToDb is from db to db

const DbToFile JobType = "db-file"

DbToFile is from db to file

const FileToDB JobType = "file-db"

FileToDB is from db to db

const FileToFile JobType = "file-file"

FileToFile is from file to file

type Mode

type Mode string

Mode is a load mode

const (
	// TruncateMode is to truncate
	TruncateMode Mode = "truncate"
	// FullRefreshMode is to drop
	FullRefreshMode Mode = "full-refresh"
	// IncrementalMode is to incremental
	IncrementalMode Mode = "incremental"
	// SnapshotMode is to snapshot
	SnapshotMode Mode = "snapshot"
)

type NotificationConfig

type NotificationConfig struct {
	Name        string   `json:"name"`
	Emails      []string `json:"emails"`
	Slack       bool     `json:"slack"`
	MsTeams     bool     `json:"msteams"`
	WebhookURLs []string `json:"webhook_urls"` // urls
	OnSuccess   bool     `json:"on_success"`
	OnFailure   bool     `json:"on_failure"`
	OnLinger    bool     `json:"on_linger"`
	OnEmpty     bool     `json:"on_empty"`
}

type ProgressBar

type ProgressBar struct {
	// contains filtered or unexported fields
}

func NewPBar

func NewPBar(d time.Duration) *ProgressBar

NewPBar creates a new progress bar

func (*ProgressBar) Finish

func (pb *ProgressBar) Finish()

func (*ProgressBar) SetStatus

func (pb *ProgressBar) SetStatus(status string)

SetStatus sets the progress bar status

func (*ProgressBar) Start

func (pb *ProgressBar) Start()

type Project

type Project struct {
	Config      ProjectConfig
	TaskConfigs map[string]Config
}

type ProjectConfig

type ProjectConfig struct {
	Project          string                        `json:"project" yaml:"project"`
	TaskPaths        []string                      `json:"task-paths" yaml:"task-paths"`
	Defaults         map[string]interface{}        `json:"defaults" yaml:"defaults"`
	NotificationTags map[string]NotificationConfig `json:"notification_tags" yaml:"notification_tags"`
}

type ReplicationConfig added in v0.86.36

type ReplicationConfig struct {
	Source   string                              `json:"source,omitempty" yaml:"source,omitempty"`
	Target   string                              `json:"target,omitempty" yaml:"target,omitempty"`
	Defaults ReplicationStreamConfig             `json:"defaults,omitempty" yaml:"defaults,omitempty"`
	Streams  map[string]*ReplicationStreamConfig `json:"streams,omitempty" yaml:"streams,omitempty"`
}

func LoadReplicationConfig added in v0.87.17

func LoadReplicationConfig(cfgPath string) (config ReplicationConfig, err error)

func UnmarshalReplication added in v0.86.39

func UnmarshalReplication(replicYAML string) (config ReplicationConfig, err error)

UnmarshalReplication converts a yaml file to a replication

func (*ReplicationConfig) ProcessWildcards added in v0.87.18

func (rd *ReplicationConfig) ProcessWildcards() (err error)

ProcessWildcards process the streams using wildcards such as `my_schema.*` or `my_schema.my_prefix_*` or `my_schema.*_my_suffix`

func (*ReplicationConfig) Scan added in v0.86.36

func (rd *ReplicationConfig) Scan(value interface{}) error

Scan scan value into Jsonb, implements sql.Scanner interface

func (ReplicationConfig) Value added in v0.86.36

func (rd ReplicationConfig) Value() (driver.Value, error)

Value return json value, implement driver.Valuer interface

type ReplicationStreamConfig added in v0.86.36

type ReplicationStreamConfig struct {
	Mode          Mode           `json:"mode,omitempty" yaml:"mode,omitempty"`
	Object        string         `json:"object,omitempty" yaml:"object,omitempty"`
	Columns       []string       `json:"columns,omitempty" yaml:"columns,flow,omitempty"`
	PrimaryKey    []string       `json:"primary_key,omitempty" yaml:"primary_key,flow,omitempty"`
	UpdateKey     string         `json:"update_key,omitempty" yaml:"update_key,omitempty"`
	SQL           string         `json:"sql,omitempty" yaml:"sql,omitempty"`
	Schedule      *string        `json:"schedule,omitempty" yaml:"schedule,omitempty"`
	SourceOptions *SourceOptions `json:"source_options,omitempty" yaml:"source_options,omitempty"`
	TargetOptions *TargetOptions `json:"target_options,omitempty" yaml:"target_options,omitempty"`
	Disabled      bool           `json:"disabled,omitempty" yaml:"disabled,omitempty"`
}

type RouteName

type RouteName string

RouteName is the name of a route

const (
	RouteStatus         RouteName = "/status"
	RouteNotice         RouteName = "/notice"
	RouteError          RouteName = "/error"
	RouteSignUpUser     RouteName = "/sign-up"
	RouteUser           RouteName = "/user"
	RouteForgotPassword RouteName = "/forgot-password"
	RouteResetPassword  RouteName = "/reset-password"
	RouteLogin          RouteName = "/login"
	RouteLogout         RouteName = "/logout"
	RouteProxy          RouteName = "/p"

	RouteAppIndex  RouteName = "/app"
	RouteAppLogin  RouteName = "/app/login"
	RouteAppLogout RouteName = "/app/logout"
	RouteAppAPIKey RouteName = "/app/apikey"

	RouteAPI               RouteName = "/api/v1"
	RouteMasterStatus      RouteName = "/api/v1/master-status"
	RouteMasterDBReset     RouteName = "/api/v1/master-db-reset"
	RouteUploads           RouteName = "/api/v1/uploads"
	RouteAPIAccounts       RouteName = "/api/v1/accounts"
	RouteAPIProjects       RouteName = "/api/v1/projects"
	RouteAPIKey            RouteName = "/api/v1/apikey"
	RouteAPIUsers          RouteName = "/api/v1/users"
	RouteAPIJobs           RouteName = "/api/v1/jobs"
	RouteAPILogs           RouteName = "/api/v1/logs"
	RouteAPIExecutions     RouteName = "/api/v1/executions"
	RouteAPIConnections    RouteName = "/api/v1/connections"
	RouteAPIConnectionTest RouteName = "/api/v1/connection-test"
	RouteAPIResetPassword  RouteName = "/api/v1/reset-password"
	RouteAPIDataRequest    RouteName = "/api/v1/data-request"
	RouteAPIWorkers        RouteName = "/api/v1/workers"
	RouteAPISettings       RouteName = "/api/v1/settings"
	RouteAlertLog          RouteName = "/alert/log"

	RouteWs       RouteName = "/ws"
	RouteWsClient RouteName = "/ws/client"
	RouteWsWorker RouteName = "/ws/worker"
)

type Source

type Source struct {
	Conn       string                 `json:"conn" yaml:"conn"`
	Stream     string                 `json:"stream,omitempty" yaml:"stream,omitempty"`
	Columns    []string               `json:"columns,omitempty" yaml:"columns,omitempty"`
	PrimaryKey []string               `json:"primary_key,omitempty" yaml:"primary_key,omitempty"`
	UpdateKey  string                 `json:"update_key,omitempty" yaml:"update_key,omitempty"`
	Limit      int                    `json:"limit,omitempty" yaml:"limit,omitempty"`
	Options    *SourceOptions         `json:"options,omitempty" yaml:"options,omitempty"`
	Data       map[string]interface{} `json:"data,omitempty" yaml:"data,omitempty"`
	// contains filtered or unexported fields
}

Source is a source of data

type SourceOptions

type SourceOptions struct {
	TrimSpace      *bool               `json:"trim_space,omitempty" yaml:"trim_space,omitempty"`
	EmptyAsNull    *bool               `json:"empty_as_null,omitempty" yaml:"empty_as_null,omitempty"`
	Header         *bool               `json:"header,omitempty" yaml:"header,omitempty"`
	Flatten        *bool               `json:"flatten,omitempty" yaml:"flatten,omitempty"`
	Compression    *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`
	NullIf         *string             `json:"null_if,omitempty" yaml:"null_if,omitempty"`
	DatetimeFormat string              `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`
	SkipBlankLines *bool               `json:"skip_blank_lines,omitempty" yaml:"skip_blank_lines,omitempty"`
	Delimiter      string              `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
	MaxDecimals    *int                `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`
}

SourceOptions are connection and stream processing options

type Target

type Target struct {
	Conn    string                 `json:"conn" yaml:"conn"`
	Object  string                 `json:"object,omitempty" yaml:"object,omitempty"`
	Options *TargetOptions         `json:"options,omitempty" yaml:"options,omitempty"`
	Data    map[string]interface{} `json:"data,omitempty" yaml:"data,omitempty"`

	TmpTableCreated bool `json:"-" yaml:"-"`
	// contains filtered or unexported fields
}

Target is a target of data

type TargetOptions

type TargetOptions struct {
	Header           *bool               `json:"header,omitempty" yaml:"header,omitempty"`
	Compression      *iop.CompressorType `json:"compression,omitempty" yaml:"compression,omitempty"`
	Concurrency      int                 `json:"concurrency,omitempty" yaml:"concurrency,omitempty"`
	DatetimeFormat   string              `json:"datetime_format,omitempty" yaml:"datetime_format,omitempty"`
	Delimiter        string              `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
	FileMaxRows      int64               `json:"file_max_rows,omitempty" yaml:"file_max_rows,omitempty"`
	MaxDecimals      *int                `json:"max_decimals,omitempty" yaml:"max_decimals,omitempty"`
	UseBulk          *bool               `json:"use_bulk,omitempty" yaml:"use_bulk,omitempty"`
	AddNewColumns    bool                `json:"add_new_columns,omitempty" yaml:"add_new_columns,omitempty"`
	AdjustColumnType *bool               `json:"adjust_column_type,omitempty" yaml:"adjust_column_type,omitempty"`

	TableTmp string `json:"table_tmp,omitempty" yaml:"table_tmp,omitempty"`
	TableDDL string `json:"table_ddl,omitempty" yaml:"table_ddl,omitempty"`
	PreSQL   string `json:"pre_sql,omitempty" yaml:"pre_sql,omitempty"`
	PostSQL  string `json:"post_sql,omitempty" yaml:"post_sql,omitempty"`
}

TargetOptions are target connection and stream processing options

type TaskExecution

type TaskExecution struct {
	ExecID    int64      `json:"exec_id"`
	Config    *Config    `json:"config"`
	Type      JobType    `json:"type"`
	Status    ExecStatus `json:"status"`
	Err       error      `json:"error"`
	StartTime *time.Time `json:"start_time"`
	EndTime   *time.Time `json:"end_time"`
	Bytes     uint64     `json:"bytes"`
	Context   *g.Context `json:"-"`
	Progress  string     `json:"progress"`

	ProgressHist   []string     `json:"progress_hist"`
	PBar           *ProgressBar `json:"-"`
	ProcStatsStart g.ProcStats  `json:"-"` // process stats at beginning
	// contains filtered or unexported fields
}

TaskExecution is a sling ELT task run, synonymous to an execution

func NewTask

func NewTask(execID int64, cfg *Config) (t *TaskExecution)

NewTask creates a Sling task with given configuration

func (*TaskExecution) AddCleanupTask added in v0.84.3

func (t *TaskExecution) AddCleanupTask(f func())

func (*TaskExecution) Cleanup added in v0.84.3

func (t *TaskExecution) Cleanup()

func (*TaskExecution) Execute

func (t *TaskExecution) Execute() error

Execute runs a Sling task. This may be a file/db to file/db transfer

func (*TaskExecution) GetBytes

func (t *TaskExecution) GetBytes() (inBytes, outBytes uint64)

GetBytes return the current total of bytes processed

func (*TaskExecution) GetBytesString

func (t *TaskExecution) GetBytesString() (s string)

func (*TaskExecution) GetCount

func (t *TaskExecution) GetCount() (count uint64)

GetCount return the current count of rows processed

func (*TaskExecution) GetRate

func (t *TaskExecution) GetRate(secWindow int) (rowRate, byteRate int64)

GetRate return the speed of flow (rows / sec and bytes / sec) secWindow is how many seconds back to measure (0 is since beginning)

func (*TaskExecution) GetTotalBytes

func (t *TaskExecution) GetTotalBytes() (rcBytes, txBytes uint64)

GetTotalBytes gets the inbound/oubound bytes of the task

func (*TaskExecution) IsStalled

func (t *TaskExecution) IsStalled(window float64) bool

IsStalled determines if the task has stalled (no row increment)

func (*TaskExecution) ReadFromAPI

func (t *TaskExecution) ReadFromAPI(cfg *Config, client *airbyte.Airbyte) (df *iop.Dataflow, err error)

ReadFromAPI reads from a source API

func (*TaskExecution) ReadFromDB

func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df *iop.Dataflow, err error)

ReadFromDB reads from a source database

func (*TaskExecution) ReadFromFile

func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error)

ReadFromFile reads from a source file

func (*TaskExecution) SetProgress

func (t *TaskExecution) SetProgress(progressText string, args ...interface{})

SetProgress sets the progress

func (*TaskExecution) WriteToDb

func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn database.Connection) (cnt uint64, err error)

WriteToDb writes to a target DB create temp table load into temp table insert / incremental / replace into target table

func (*TaskExecution) WriteToFile

func (t *TaskExecution) WriteToFile(cfg *Config, df *iop.Dataflow) (cnt uint64, err error)

WriteToFile writes to a target file

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL