datamove

package
v1.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DataTypeEmpty        = 0
	DataTypeCols         = 1 // 列数据
	DataTypeSql          = 2 // SQL 语句
	DataTypeSqlAndParams = 3 // 带占位符的SQL语句
)

Variables

View Source
var (
	ErrorStopped = errors.New(`task stopped`)
)

Functions

func DateMove

func DateMove(progress *Progress, from DataSource, to DataSource) (err error)

func New

func New(options *Options) (t *task.Task, err error)

func ValidateDataType

func ValidateDataType(dataType DataType) (err error)

Types

type Column

type Column struct {
	*dialect.ColumnModel
	Value string `json:"value"`
}

type Data

type Data struct {
	Total        int64           `json:"total"`
	DataType     DataType        `json:"dataType"`
	ColsList     [][]interface{} `json:"colsList"`
	SqlList      []string        `json:"sqlList"`
	SqlAndParams []*SqlAndParam  `json:"sqlAndParams"`
}

type DataSource

type DataSource interface {
	Stop(progress *Progress)
	ReadStart(progress *Progress) (err error)
	Read(progress *Progress, dataChan chan *Data) (err error)
	ReadEnd(progress *Progress) (err error)
	WriteStart(progress *Progress) (err error)
	Write(progress *Progress, data *Data) (err error)
	WriteEnd(progress *Progress) (err error)
}

type DataSourceBase

type DataSourceBase struct {
	ColumnList []*Column

	ScriptContext map[string]interface{}
	// contains filtered or unexported fields
}

func NewDataSourceBase added in v1.2.3

func NewDataSourceBase(columnList []*Column) *DataSourceBase

func (*DataSourceBase) DataToValues

func (this_ *DataSourceBase) DataToValues(progress *Progress, data map[string]interface{}) (res []interface{}, err error)

func (*DataSourceBase) GetColumnList added in v1.2.3

func (this_ *DataSourceBase) GetColumnList() []*Column

func (*DataSourceBase) GetColumnNames

func (this_ *DataSourceBase) GetColumnNames() []string

func (*DataSourceBase) GetDialectColumnList

func (this_ *DataSourceBase) GetDialectColumnList() []*dialect.ColumnModel

func (*DataSourceBase) GetStringValueByScript added in v1.2.2

func (this_ *DataSourceBase) GetStringValueByScript(script string) (res string, err error)

func (*DataSourceBase) GetValueByScript added in v1.2.2

func (this_ *DataSourceBase) GetValueByScript(script string) (interface{}, error)

func (*DataSourceBase) SetScriptContext added in v1.2.2

func (this_ *DataSourceBase) SetScriptContext(key string, value interface{})

func (*DataSourceBase) SetScriptContextData added in v1.2.3

func (this_ *DataSourceBase) SetScriptContextData(data map[string]interface{})

func (*DataSourceBase) ValuesToData

func (this_ *DataSourceBase) ValuesToData(progress *Progress, cols []interface{}) (data map[string]interface{}, err error)

func (*DataSourceBase) ValuesToValues

func (this_ *DataSourceBase) ValuesToValues(progress *Progress, cols []interface{}) (res []interface{}, err error)

type DataSourceConfig

type DataSourceConfig struct {
	*dialect.ParamModel
	Type             string `json:"type"`
	SqlFileMergeType string `json:"sqlFileMergeType"` // SQL 的文件合并类型 如:one:一个文件, owner:每个库一个文件,table:每个表一个文件
	ShouldTrimSpace  bool   `json:"shouldTrimSpace"`  // 是否需要去除空白字符
	ColSeparator     string `json:"colSeparator"`     // 列 分隔符 默认 `,`
	ReplaceCol       string `json:"replaceCol"`       //
	ReplaceLine      string `json:"replaceLine"`      //
	TxtFileType      string `json:"txtFileType"`      //

	// 数据库 配置
	DbConfig    *db.Config `json:"-"`
	DialectType string     `json:"dialectType"`

	Username string `json:"username"`
	Password string `json:"password"`

	EsConfig *elasticsearch.Config `json:"-"`

	RedisConfig *redis.Config `json:"-"`

	KafkaConfig *kafka.Config `json:"-"`

	OwnerName        string `json:"ownerName"`
	TableName        string `json:"tableName"`
	BySql            bool   `json:"bySql"` // 根据 SQL 语句导出
	SelectSql        string `json:"selectSql"`
	ShouldSelectPage bool   `json:"shouldSelectPage"`

	IndexName     string `json:"indexName"`
	IndexIdName   string `json:"indexIdName"`
	IndexIdScript string `json:"indexIdScript"`

	TopicName      string `json:"topicName"`
	TopicGroupName string `json:"topicGroupName"`
	TopicKey       string `json:"topicKey"`
	TopicValue     string `json:"topicValue"`

	DataList []map[string]interface{} `json:"dataList"`

	Total int64 `json:"total"`

	ColumnList []*Column `json:"columnList"`

	AllOwner       bool       `json:"allOwner"`
	Owners         []*DbOwner `json:"owners"`
	SkipOwnerNames []string   `json:"skipOwnerNames"`

	FilePath    string `json:"filePath"`
	ShouldOwner bool   `json:"shouldOwner"` // 需要 建库
	ShouldTable bool   `json:"shouldTable"` // 需要 建表

	FileNameSplice string `json:"fileNameSplice"` // 文件名拼接字符 如:/ :库作为目录 表作为名称 默认
	FileName       string `json:"fileName"`
	RowNumber      int64  `json:"rowNumber"`
	// contains filtered or unexported fields
}

func (*DataSourceConfig) GetDialect added in v1.2.5

func (this_ *DataSourceConfig) GetDialect() dialect.Dialect

func (*DataSourceConfig) GetDialectParam added in v1.2.3

func (this_ *DataSourceConfig) GetDialectParam() *dialect.ParamModel

func (*DataSourceConfig) GetFileName added in v1.2.3

func (this_ *DataSourceConfig) GetFileName() string

func (*DataSourceConfig) GetTxtFileType added in v1.2.2

func (this_ *DataSourceConfig) GetTxtFileType() string

func (*DataSourceConfig) IsData

func (this_ *DataSourceConfig) IsData() bool

func (*DataSourceConfig) IsDb

func (this_ *DataSourceConfig) IsDb() bool

func (*DataSourceConfig) IsEs

func (this_ *DataSourceConfig) IsEs() bool

func (*DataSourceConfig) IsExcel

func (this_ *DataSourceConfig) IsExcel() bool

func (*DataSourceConfig) IsKafka

func (this_ *DataSourceConfig) IsKafka() bool

func (*DataSourceConfig) IsRedis

func (this_ *DataSourceConfig) IsRedis() bool

func (*DataSourceConfig) IsScript added in v1.2.2

func (this_ *DataSourceConfig) IsScript() bool

func (*DataSourceConfig) IsSql

func (this_ *DataSourceConfig) IsSql() bool

func (*DataSourceConfig) IsTxt

func (this_ *DataSourceConfig) IsTxt() bool

type DataSourceData

type DataSourceData struct {
	*DataSourceBase
	DataList []map[string]interface{}
}

func NewDataSourceData

func NewDataSourceData() *DataSourceData

func (*DataSourceData) GetDataList added in v1.2.3

func (this_ *DataSourceData) GetDataList() []map[string]interface{}

func (*DataSourceData) Read

func (this_ *DataSourceData) Read(progress *Progress, dataChan chan *Data) (err error)

func (*DataSourceData) ReadEnd

func (this_ *DataSourceData) ReadEnd(progress *Progress) (err error)

func (*DataSourceData) ReadStart

func (this_ *DataSourceData) ReadStart(progress *Progress) (err error)

func (*DataSourceData) Stop

func (this_ *DataSourceData) Stop(progress *Progress)

func (*DataSourceData) Write

func (this_ *DataSourceData) Write(progress *Progress, data *Data) (err error)

func (*DataSourceData) WriteEnd

func (this_ *DataSourceData) WriteEnd(progress *Progress) (err error)

func (*DataSourceData) WriteStart

func (this_ *DataSourceData) WriteStart(progress *Progress) (err error)

type DataSourceDb

type DataSourceDb struct {
	*dialect.ParamModel
	*DataSourceBase

	OwnerName        string `json:"ownerName"`
	TableName        string `json:"tableName"`
	SelectSql        string `json:"selectSql"`
	ShouldSelectPage bool   `json:"shouldSelectPage"`

	Service db.IService
}

func NewDataSourceDb

func NewDataSourceDb() *DataSourceDb

func (*DataSourceDb) GetParam

func (this_ *DataSourceDb) GetParam() *dialect.ParamModel

func (*DataSourceDb) Read

func (this_ *DataSourceDb) Read(progress *Progress, dataChan chan *Data) (err error)

func (*DataSourceDb) ReadEnd

func (this_ *DataSourceDb) ReadEnd(progress *Progress) (err error)

func (*DataSourceDb) ReadStart

func (this_ *DataSourceDb) ReadStart(progress *Progress) (err error)

func (*DataSourceDb) Stop

func (this_ *DataSourceDb) Stop(progress *Progress)

func (*DataSourceDb) Write

func (this_ *DataSourceDb) Write(progress *Progress, data *Data) (err error)

func (*DataSourceDb) WriteEnd

func (this_ *DataSourceDb) WriteEnd(progress *Progress) (err error)

func (*DataSourceDb) WriteStart

func (this_ *DataSourceDb) WriteStart(progress *Progress) (err error)

type DataSourceEs

type DataSourceEs struct {
	*DataSourceBase
	IndexName     string `json:"indexName"`
	IndexIdName   string `json:"indexIdName"`
	IndexIdScript string `json:"indexIdScript"`
	SelectSql     string `json:"selectSql"`

	Service elasticsearch.IService
}

func NewDataSourceEs

func NewDataSourceEs() *DataSourceEs

func (*DataSourceEs) Read

func (this_ *DataSourceEs) Read(progress *Progress, dataChan chan *Data) (err error)

func (*DataSourceEs) ReadEnd

func (this_ *DataSourceEs) ReadEnd(progress *Progress) (err error)

func (*DataSourceEs) ReadStart

func (this_ *DataSourceEs) ReadStart(progress *Progress) (err error)

func (*DataSourceEs) Stop

func (this_ *DataSourceEs) Stop(progress *Progress)

func (*DataSourceEs) Write

func (this_ *DataSourceEs) Write(progress *Progress, data *Data) (err error)

func (*DataSourceEs) WriteEnd

func (this_ *DataSourceEs) WriteEnd(progress *Progress) (err error)

func (*DataSourceEs) WriteStart

func (this_ *DataSourceEs) WriteStart(progress *Progress) (err error)

type DataSourceExcel

type DataSourceExcel struct {
	*DataSourceBase
	FilePath string `json:"filePath"`

	ColumnNameMapping map[string]string `json:"columnNameMapping"`
	ShouldTrimSpace   bool              `json:"shouldTrimSpace"` // 是否需要去除空白字符
	SheetName         string            `json:"sheetName"`
	// contains filtered or unexported fields
}

func NewDataSourceExcel

func NewDataSourceExcel() *DataSourceExcel

func (*DataSourceExcel) CloseReadFile

func (this_ *DataSourceExcel) CloseReadFile()

func (*DataSourceExcel) CloseWriteFile

func (this_ *DataSourceExcel) CloseWriteFile()

func (*DataSourceExcel) GetReadFile

func (this_ *DataSourceExcel) GetReadFile() (file *xlsx.File, err error)

func (*DataSourceExcel) GetWriteFile

func (this_ *DataSourceExcel) GetWriteFile() (file *xlsx.File, err error)

func (*DataSourceExcel) Read

func (this_ *DataSourceExcel) Read(progress *Progress, dataChan chan *Data) (err error)

func (*DataSourceExcel) ReadEnd

func (this_ *DataSourceExcel) ReadEnd(progress *Progress) (err error)

func (*DataSourceExcel) ReadLineCount

func (this_ *DataSourceExcel) ReadLineCount() (lineCount int64, err error)

func (*DataSourceExcel) ReadStart

func (this_ *DataSourceExcel) ReadStart(progress *Progress) (err error)

func (*DataSourceExcel) Stop

func (this_ *DataSourceExcel) Stop(progress *Progress)

func (*DataSourceExcel) Write

func (this_ *DataSourceExcel) Write(progress *Progress, data *Data) (err error)

func (*DataSourceExcel) WriteEnd

func (this_ *DataSourceExcel) WriteEnd(progress *Progress) (err error)

func (*DataSourceExcel) WriteStart

func (this_ *DataSourceExcel) WriteStart(progress *Progress) (err error)

type DataSourceFile

type DataSourceFile struct {
	FilePath        string
	ShouldTrimSpace bool `json:"shouldTrimSpace"` // 是否需要去除空白字符
	// contains filtered or unexported fields
}

func NewDataSourceFile added in v1.2.3

func NewDataSourceFile(filePath string) *DataSourceFile

func (*DataSourceFile) CloseReadFile

func (this_ *DataSourceFile) CloseReadFile()

func (*DataSourceFile) CloseWriteFile

func (this_ *DataSourceFile) CloseWriteFile()

func (*DataSourceFile) GetFilePath added in v1.2.3

func (this_ *DataSourceFile) GetFilePath() string

func (*DataSourceFile) GetReadFile

func (this_ *DataSourceFile) GetReadFile() (file *os.File, err error)

func (*DataSourceFile) GetWriteBuf

func (this_ *DataSourceFile) GetWriteBuf() (bufWriter *bufio.Writer, err error)

func (*DataSourceFile) GetWriteFile

func (this_ *DataSourceFile) GetWriteFile() (file *os.File, err error)

func (*DataSourceFile) ReadEnd

func (this_ *DataSourceFile) ReadEnd(progress *Progress) (err error)

func (*DataSourceFile) ReadLineCount

func (this_ *DataSourceFile) ReadLineCount() (lineCount int64, err error)

func (*DataSourceFile) ReadStart

func (this_ *DataSourceFile) ReadStart(progress *Progress) (err error)

func (*DataSourceFile) Stop

func (this_ *DataSourceFile) Stop(progress *Progress)

func (*DataSourceFile) WriteEnd

func (this_ *DataSourceFile) WriteEnd(progress *Progress) (err error)

func (*DataSourceFile) WriteStart

func (this_ *DataSourceFile) WriteStart(progress *Progress) (err error)

type DataSourceKafka

type DataSourceKafka struct {
	*DataSourceBase
	TopicName      string `json:"topicName"`
	TopicGroupName string `json:"topicGroupName"`
	TopicKey       string `json:"topicKey"`
	TopicValue     string `json:"topicValue"`
	PullWait       int64  `json:"pullWait"`

	Service kafka.IService
}

func NewDataSourceKafka

func NewDataSourceKafka() *DataSourceKafka

func (*DataSourceKafka) Read

func (this_ *DataSourceKafka) Read(progress *Progress, dataChan chan *Data) (err error)

func (*DataSourceKafka) ReadEnd

func (this_ *DataSourceKafka) ReadEnd(progress *Progress) (err error)

func (*DataSourceKafka) ReadStart

func (this_ *DataSourceKafka) ReadStart(progress *Progress) (err error)

func (*DataSourceKafka) Stop

func (this_ *DataSourceKafka) Stop(progress *Progress)

func (*DataSourceKafka) Write

func (this_ *DataSourceKafka) Write(progress *Progress, data *Data) (err error)

func (*DataSourceKafka) WriteEnd

func (this_ *DataSourceKafka) WriteEnd(progress *Progress) (err error)

func (*DataSourceKafka) WriteStart

func (this_ *DataSourceKafka) WriteStart(progress *Progress) (err error)

type DataSourceRedis

type DataSourceRedis struct {
	*DataSourceBase

	Service redis.IService
}

func NewDataSourceRedis

func NewDataSourceRedis() *DataSourceRedis

func (*DataSourceRedis) Read

func (this_ *DataSourceRedis) Read(progress *Progress, dataChan chan *Data) (err error)

func (*DataSourceRedis) ReadEnd

func (this_ *DataSourceRedis) ReadEnd(progress *Progress) (err error)

func (*DataSourceRedis) ReadStart

func (this_ *DataSourceRedis) ReadStart(progress *Progress) (err error)

func (*DataSourceRedis) Stop

func (this_ *DataSourceRedis) Stop(progress *Progress)

func (*DataSourceRedis) Write

func (this_ *DataSourceRedis) Write(progress *Progress, data *Data) (err error)

func (*DataSourceRedis) WriteEnd

func (this_ *DataSourceRedis) WriteEnd(progress *Progress) (err error)

func (*DataSourceRedis) WriteStart

func (this_ *DataSourceRedis) WriteStart(progress *Progress) (err error)

type DataSourceScript added in v1.2.2

type DataSourceScript struct {
	*DataSourceBase
	Total int64 `json:"total"`
}

func NewDataSourceScript added in v1.2.2

func NewDataSourceScript() *DataSourceScript

func (*DataSourceScript) Read added in v1.2.2

func (this_ *DataSourceScript) Read(progress *Progress, dataChan chan *Data) (err error)

func (*DataSourceScript) ReadEnd added in v1.2.2

func (this_ *DataSourceScript) ReadEnd(progress *Progress) (err error)

func (*DataSourceScript) ReadStart added in v1.2.2

func (this_ *DataSourceScript) ReadStart(progress *Progress) (err error)

func (*DataSourceScript) Stop added in v1.2.2

func (this_ *DataSourceScript) Stop(progress *Progress)

func (*DataSourceScript) Write added in v1.2.2

func (this_ *DataSourceScript) Write(progress *Progress, data *Data) (err error)

func (*DataSourceScript) WriteEnd added in v1.2.2

func (this_ *DataSourceScript) WriteEnd(progress *Progress) (err error)

func (*DataSourceScript) WriteStart added in v1.2.2

func (this_ *DataSourceScript) WriteStart(progress *Progress) (err error)

type DataSourceSql

type DataSourceSql struct {
	*DataSourceBase
	*DataSourceFile
	*dialect.ParamModel
	DialectType string `json:"databaseType"`
	OwnerName   string `json:"ownerName"`
	TableName   string `json:"tableName"`
	// contains filtered or unexported fields
}

func NewDataSourceSql

func NewDataSourceSql() *DataSourceSql

func (*DataSourceSql) GetDialect

func (this_ *DataSourceSql) GetDialect() dialect.Dialect

func (*DataSourceSql) GetParam

func (this_ *DataSourceSql) GetParam() *dialect.ParamModel

func (*DataSourceSql) Read

func (this_ *DataSourceSql) Read(progress *Progress, dataChan chan *Data) (err error)

func (*DataSourceSql) ReadStart

func (this_ *DataSourceSql) ReadStart(progress *Progress) (err error)

func (*DataSourceSql) Write

func (this_ *DataSourceSql) Write(progress *Progress, data *Data) (err error)

func (*DataSourceSql) WriteStart

func (this_ *DataSourceSql) WriteStart(progress *Progress) (err error)

type DataSourceTxt

type DataSourceTxt struct {
	*DataSourceBase
	*DataSourceFile

	ColSeparator string `json:"colSeparator"` // 列 分隔符 默认 `,`
	ReplaceCol   string `json:"replaceCol"`   //
	ReplaceLine  string `json:"replaceLine"`  //
	// contains filtered or unexported fields
}

func NewDataSourceTxt

func NewDataSourceTxt() *DataSourceTxt

func (*DataSourceTxt) GetColSeparator added in v1.2.3

func (this_ *DataSourceTxt) GetColSeparator() string

func (*DataSourceTxt) Read

func (this_ *DataSourceTxt) Read(progress *Progress, dataChan chan *Data) (err error)

func (*DataSourceTxt) ReadStart

func (this_ *DataSourceTxt) ReadStart(progress *Progress) (err error)

func (*DataSourceTxt) ReadTitles added in v1.2.3

func (this_ *DataSourceTxt) ReadTitles(progress *Progress) (titles []string, err error)

func (*DataSourceTxt) StringsToValues added in v1.2.2

func (this_ *DataSourceTxt) StringsToValues(progress *Progress, cols []string) (res []interface{}, err error)

func (*DataSourceTxt) ValuesToStrings added in v1.2.2

func (this_ *DataSourceTxt) ValuesToStrings(progress *Progress, cols []interface{}) (res []string, err error)

func (*DataSourceTxt) Write

func (this_ *DataSourceTxt) Write(progress *Progress, data *Data) (err error)

type DataType

type DataType int8

type DbColumn

type DbColumn struct {
	From  *dialect.ColumnModel `json:"from"`
	To    *dialect.ColumnModel `json:"to"`
	Value string               `json:"value"`
}

type DbOwner

type DbOwner struct {
	From           *dialect.OwnerModel `json:"from"`
	To             *dialect.OwnerModel `json:"to"`
	SkipTableNames []string            `json:"skipTableNames"`
	AllTable       bool                `json:"allTable"`
	Tables         []*DbTable          `json:"tables"`
	// contains filtered or unexported fields
}

type DbTable

type DbTable struct {
	From            *dialect.TableModel `json:"from"`
	To              *dialect.TableModel `json:"to"`
	Columns         []*DbColumn         `json:"columns"`
	SkipColumnNames []string            `json:"skipColumnNames"`
	AllColumn       bool                `json:"allColumn"`

	IndexIdName   string `json:"indexIdName"`
	IndexIdScript string `json:"indexIdScript"`

	TopicGroupName string `json:"topicGroupName"`
	TopicKey       string `json:"topicKey"`
	TopicValue     string `json:"topicValue"`
	// contains filtered or unexported fields
}

func (*DbTable) GetToDialectTable

func (this_ *DbTable) GetToDialectTable() *dialect.TableModel

type Executor

type Executor struct {
	*Progress
}

func (*Executor) After

func (this_ *Executor) After(param *task.ExecutorParam) (err error)

func (*Executor) Before

func (this_ *Executor) Before(param *task.ExecutorParam) (err error)

func (*Executor) Execute

func (this_ *Executor) Execute(param *task.ExecutorParam) (err error)

type Options

type Options struct {
	Key  string            `json:"key"`  // 任务的 key
	Dir  string            `json:"dir"`  // 任务过程中 生成文件的目录
	From *DataSourceConfig `json:"from"` // 源 数据配置
	To   *DataSourceConfig `json:"to"`   // 目标 数据配置

	ErrorContinue bool  `json:"errorContinue"`
	BatchNumber   int64 `json:"batchNumber"`
}

type Progress

type Progress struct {
	*Options

	DataTotal int64 `json:"dataTotal"`

	OwnerTotal int64          `json:"ownerTotal"`
	OwnerCount *ProgressCount `json:"ownerCount"`

	TableTotal int64          `json:"tableTotal"`
	TableCount *ProgressCount `json:"tableCount"`

	ReadCount  *ProgressCount `json:"readCount"`
	WriteCount *ProgressCount `json:"writeCount"`

	IndexTotal int64          `json:"indexTotal"`
	IndexCount *ProgressCount `json:"indexCount"`

	TopicTotal int64          `json:"topicTotal"`
	TopicCount *ProgressCount `json:"topicCount"`
	// contains filtered or unexported fields
}

func NewProgress

func NewProgress(options *Options) *Progress

func (*Progress) ShouldStop

func (this_ *Progress) ShouldStop() bool

type ProgressCount

type ProgressCount struct {
	Total   int64    `json:"total"`
	Error   int64    `json:"error"`
	Success int64    `json:"success"`
	Errors  []string `json:"errors"`
}

func (*ProgressCount) AddError

func (this_ *ProgressCount) AddError(size int64, err error)

func (*ProgressCount) AddSuccess

func (this_ *ProgressCount) AddSuccess(size int64)

type SqlAndParam

type SqlAndParam struct {
	Sql    string        `json:"sql"`
	Params []interface{} `json:"params"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL