dbms

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2023 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ExecModeNormal = "Normal" //无事务执行
	ExecModeStmt   = "Stmt"   //prepare/exec无事务执行
	ExecModeTx     = "Tx"     //事务执行
	ExecModeStmtTx = "StmtTx" //prepare/exec事务执行
)

执行模式

Variables

This section is empty.

Functions

func StartWrite

func StartWrite(ctx context.Context, w BatchWriter,
	receiver plugin.RecordReceiver) (err error)

StartWrite 通过批量写入器writer和记录接受器receiver将记录写入数据库

Types

type BaseBatchWriter

type BaseBatchWriter struct {
	Task *Task
	// contains filtered or unexported fields
}

BaseBatchWriter 批量写入器

func NewBaseBatchWriter

func NewBaseBatchWriter(task *Task, execMode string, opts *sql.TxOptions) *BaseBatchWriter

NewBaseBatchWriter 获取任务task,执行模式execMode,事务选项opts创建批量写入器

func (*BaseBatchWriter) BatchSize

func (b *BaseBatchWriter) BatchSize() int

BatchSize 单批次插入数据

func (*BaseBatchWriter) BatchTimeout

func (b *BaseBatchWriter) BatchTimeout() time.Duration

BatchTimeout 单批次插入超时时间

func (*BaseBatchWriter) BatchWrite

func (b *BaseBatchWriter) BatchWrite(ctx context.Context, records []element.Record) (err error)

BatchWrite 批次写入

func (*BaseBatchWriter) JobID

func (b *BaseBatchWriter) JobID() int64

JobID 工作编号

func (*BaseBatchWriter) TaskGroupID

func (b *BaseBatchWriter) TaskGroupID() int64

TaskGroupID 任务组编号

func (*BaseBatchWriter) TaskID

func (b *BaseBatchWriter) TaskID() int64

TaskID 任务组任务编号

type BaseConfig

type BaseConfig struct {
	Username     string                `json:"username"`     //用户名
	Password     string                `json:"password"`     //密码
	Column       []string              `json:"column"`       //列信息
	Connection   dbmsreader.ConnConfig `json:"connection"`   //连接信息
	WriteMode    string                `json:"writeMode"`    //写入模式,如插入insert
	BatchSize    int                   `json:"batchSize"`    //单次批量写入数
	BatchTimeout time2.Duration        `json:"batchTimeout"` //单次批量写入超时时间
	PreSQL       []string              `json:"preSQL"`       //准备的SQL语句
	PostSQL      []string              `json:"postSQL"`      //结束的SQL语句
	// contains filtered or unexported fields
}

BaseConfig 用于实现基本的关系数据库配置,如无特殊情况采用该配置,帮助快速实现writer

func NewBaseConfig

func NewBaseConfig(conf *config.JSON) (c *BaseConfig, err error)

NewBaseConfig 从conf解析出关系数据库配置

func (*BaseConfig) GetBaseTable

func (b *BaseConfig) GetBaseTable() *database.BaseTable

GetBaseTable 获取表信息

func (*BaseConfig) GetBatchSize

func (b *BaseConfig) GetBatchSize() int

GetBatchSize 单次批量写入数

func (*BaseConfig) GetBatchTimeout

func (b *BaseConfig) GetBatchTimeout() time.Duration

GetBatchTimeout 单次批量超时时间

func (*BaseConfig) GetColumns

func (b *BaseConfig) GetColumns() (columns []dbmsreader.Column)

GetColumns 获取列信息

func (*BaseConfig) GetPassword

func (b *BaseConfig) GetPassword() string

GetPassword 获取密码

func (*BaseConfig) GetPostSQL

func (b *BaseConfig) GetPostSQL() []string

GetPostSQL 获取结束的SQL语句

func (*BaseConfig) GetPreSQL

func (b *BaseConfig) GetPreSQL() []string

GetPreSQL 获取准备的SQL语句

func (*BaseConfig) GetRetryStrategy

func (b *BaseConfig) GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy,
	error)

GetRetryStrategy 获取重试策略

func (*BaseConfig) GetURL

func (b *BaseConfig) GetURL() string

GetURL 获取连接url

func (*BaseConfig) GetUsername

func (b *BaseConfig) GetUsername() string

GetUsername 获取用户名

func (*BaseConfig) GetWriteMode

func (b *BaseConfig) GetWriteMode() string

GetWriteMode 获取写入模式

func (*BaseConfig) IgnoreOneByOneError

func (b *BaseConfig) IgnoreOneByOneError() bool

IgnoreOneByOneError 忽略一个个重试的错误

type BaseDbHandler

type BaseDbHandler struct {
	// contains filtered or unexported fields
}

BaseDbHandler 基础数据库执行句柄封装

func NewBaseDbHandler

func NewBaseDbHandler(newExecer func(name string, conf *config.JSON) (Execer, error), opts *sql.TxOptions) *BaseDbHandler

NewBaseDbHandler 通过获取执行器函数newExecer和数据库事务执行选项opts创建数据库执行句柄封装

func (*BaseDbHandler) Config

func (d *BaseDbHandler) Config(conf *config.JSON) (Config, error)

Config 通过配置获取关系型数据库配置

func (*BaseDbHandler) Execer

func (d *BaseDbHandler) Execer(name string, conf *config.JSON) (Execer, error)

Execer 通过数据库名name和配置获取执行器

func (*BaseDbHandler) TableParam

func (d *BaseDbHandler) TableParam(config Config, execer Execer) database.Parameter

TableParam 通过关系型数据库配置和执行器获取表参数

type BatchWriter

type BatchWriter interface {
	JobID() int64                                                   //工作编号
	TaskGroupID() int64                                             //任务组编号
	TaskID() int64                                                  //任务编号
	BatchSize() int                                                 //单次批量写入数
	BatchTimeout() time.Duration                                    //单次批量写入超时时间
	BatchWrite(ctx context.Context, records []element.Record) error //批量写入
}

BatchWriter 批量写入器

type Config

type Config interface {
	GetUsername() string                                                     //获取用户名
	GetPassword() string                                                     //获取密码
	GetURL() string                                                          //获取连接url
	GetColumns() []dbmsreader.Column                                         //获取列信息
	GetBaseTable() *database.BaseTable                                       //获取表信息
	GetWriteMode() string                                                    //获取写入模式
	GetBatchSize() int                                                       //单次批量写入数
	GetBatchTimeout() time.Duration                                          //单次批量写入超时时间
	GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy, error) //获取重试策略
	IgnoreOneByOneError() bool                                               //忽略一个个重试的错误
	GetPreSQL() []string                                                     //获取准备的SQL语句
	GetPostSQL() []string                                                    //获取结束的SQL语句
}

Config 关系数据库写入器配置

type DbHandler

type DbHandler interface {
	Execer(name string, conf *config.JSON) (Execer, error)      //通过数据库名name和配置获取执行器
	Config(conf *config.JSON) (Config, error)                   //通过配置获取关系型数据库配置
	TableParam(config Config, execer Execer) database.Parameter //通过关系型数据库配置和执行器获取表参数
}

DbHandler 数据库执行句柄封装

type Execer

type Execer interface {
	//通过基础表信息获取具体表
	Table(*database.BaseTable) database.Table
	//检测连通性
	PingContext(ctx context.Context) error
	//通过query查询语句进行查询
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
	//通过query查询语句进行查询
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
	//通过参数param获取具体表
	FetchTableWithParam(ctx context.Context, param database.Parameter) (database.Table, error)
	//批量执行
	BatchExec(ctx context.Context, opts *database.ParameterOptions) (err error)
	//prepare/exec批量执行
	BatchExecStmt(ctx context.Context, opts *database.ParameterOptions) (err error)
	//事务批量执行
	BatchExecWithTx(ctx context.Context, opts *database.ParameterOptions) (err error)
	//事务prepare/exec批量执行
	BatchExecStmtWithTx(ctx context.Context, opts *database.ParameterOptions) (err error)
	//关闭
	Close() error
}

Execer 执行器

type Job

type Job struct {
	*plugin.BaseJob

	Handler DbHandler //数据库句柄
	Execer  Execer    //执行器
	// contains filtered or unexported fields
}

Job 工作

func NewJob

func NewJob(handler DbHandler) *Job

NewJob 通过数据库句柄获取工作

func (*Job) Destroy

func (j *Job) Destroy(ctx context.Context) (err error)

Destroy 销毁

func (*Job) Init

func (j *Job) Init(ctx context.Context) (err error)

Init 初始化

func (*Job) Post

func (j *Job) Post(ctx context.Context) (err error)

Post 后置

func (*Job) Prepare

func (j *Job) Prepare(ctx context.Context) (err error)

Prepare 准备

func (*Job) Split

func (j *Job) Split(ctx context.Context, number int) (confs []*config.JSON, err error)

Split 切分任务

type Task

type Task struct {
	*writer.BaseTask

	Handler DbHandler
	Execer  Execer
	Config  Config
	Table   database.Table
}

Task 任务

func NewTask

func NewTask(handler DbHandler) *Task

NewTask 通过数据库句柄handler创建任务

func (*Task) Destroy

func (t *Task) Destroy(ctx context.Context) (err error)

Destroy 销毁

func (*Task) Init

func (t *Task) Init(ctx context.Context) (err error)

Init 初始化

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL