Documentation ¶
Index ¶
- Constants
- func StartWrite(ctx context.Context, w BatchWriter, receiver plugin.RecordReceiver) (err error)
- type BaseBatchWriter
- func (b *BaseBatchWriter) BatchSize() int
- func (b *BaseBatchWriter) BatchTimeout() time.Duration
- func (b *BaseBatchWriter) BatchWrite(ctx context.Context, records []element.Record) (err error)
- func (b *BaseBatchWriter) JobID() int64
- func (b *BaseBatchWriter) TaskGroupID() int64
- func (b *BaseBatchWriter) TaskID() int64
- type BaseConfig
- func (b *BaseConfig) GetBaseTable() *database.BaseTable
- func (b *BaseConfig) GetBatchSize() int
- func (b *BaseConfig) GetBatchTimeout() time.Duration
- func (b *BaseConfig) GetColumns() (columns []rdbmreader.Column)
- func (b *BaseConfig) GetPassword() string
- func (b *BaseConfig) GetPostSQL() []string
- func (b *BaseConfig) GetPreSQL() []string
- func (b *BaseConfig) GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy, error)
- func (b *BaseConfig) GetURL() string
- func (b *BaseConfig) GetUsername() string
- func (b *BaseConfig) GetWriteMode() string
- func (b *BaseConfig) IgnoreOneByOneError() bool
- type BaseDbHandler
- type BatchWriter
- type Config
- type DbHandler
- type Execer
- type Job
- func (j *Job) Destroy(ctx context.Context) (err error)
- func (j *Job) Init(ctx context.Context) (err error)
- func (j *Job) Post(ctx context.Context) (err error)
- func (j *Job) Prepare(ctx context.Context) (err error)
- func (j *Job) Split(ctx context.Context, number int) (confs []*config.JSON, err error)
- type Task
Constants ¶
View Source
const ( ExecModeNormal = "Normal" //无事务执行 ExecModeStmt = "Stmt" //prepare/exec无事务执行 ExecModeTx = "Tx" //事务执行 ExecModeStmtTx = "StmtTx" //prepare/exec事务执行 )
执行模式
Variables ¶
This section is empty.
Functions ¶
func StartWrite ¶
func StartWrite(ctx context.Context, w BatchWriter, receiver plugin.RecordReceiver) (err error)
StartWrite 通过批量写入器writer和记录接受器receiver将记录写入数据库
Types ¶
type BaseBatchWriter ¶
type BaseBatchWriter struct { Task *Task // contains filtered or unexported fields }
BaseBatchWriter 批量写入器
func NewBaseBatchWriter ¶
func NewBaseBatchWriter(task *Task, execMode string, opts *sql.TxOptions) *BaseBatchWriter
NewBaseBatchWriter 获取任务task,执行模式execMode,事务选项opts创建批量写入器
func (*BaseBatchWriter) BatchTimeout ¶
func (b *BaseBatchWriter) BatchTimeout() time.Duration
BatchTimeout 单批次插入超时时间
func (*BaseBatchWriter) BatchWrite ¶
BatchWrite 批次写入
func (*BaseBatchWriter) TaskGroupID ¶
func (b *BaseBatchWriter) TaskGroupID() int64
TaskGroupID 任务组编号
type BaseConfig ¶
type BaseConfig struct { Username string `json:"username"` //用户名 Password string `json:"password"` //密码 Column []string `json:"column"` //列信息 Connection rdbmreader.ConnConfig `json:"connection"` //连接信息 WriteMode string `json:"writeMode"` //写入模式,如插入insert BatchSize int `json:"batchSize"` //单次批量写入数 BatchTimeout time2.Duration `json:"batchTimeout"` //单次批量写入超时时间 PreSQL []string `json:"preSQL"` //准备的SQL语句 PostSQL []string `json:"postSQL"` //结束的SQL语句 // contains filtered or unexported fields }
BaseConfig 用于实现基本的关系数据库配置,如无特殊情况采用该配置,帮助快速实现writer
func NewBaseConfig ¶
func NewBaseConfig(conf *config.JSON) (c *BaseConfig, err error)
NewBaseConfig 从conf解析出关系数据库配置
func (*BaseConfig) GetBaseTable ¶
func (b *BaseConfig) GetBaseTable() *database.BaseTable
GetBaseTable 获取表信息
func (*BaseConfig) GetBatchTimeout ¶
func (b *BaseConfig) GetBatchTimeout() time.Duration
GetBatchTimeout 单次批量超时时间
func (*BaseConfig) GetColumns ¶
func (b *BaseConfig) GetColumns() (columns []rdbmreader.Column)
GetColumns 获取列信息
func (*BaseConfig) GetPostSQL ¶ added in v0.1.1
func (b *BaseConfig) GetPostSQL() []string
GetPostSQL 获取结束的SQL语句
func (*BaseConfig) GetPreSQL ¶ added in v0.1.1
func (b *BaseConfig) GetPreSQL() []string
GetPreSQL 获取准备的SQL语句
func (*BaseConfig) GetRetryStrategy ¶
func (b *BaseConfig) GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy, error)
GetRetryStrategy 获取重试策略
func (*BaseConfig) IgnoreOneByOneError ¶
func (b *BaseConfig) IgnoreOneByOneError() bool
IgnoreOneByOneError 忽略一个个重试的错误
type BaseDbHandler ¶
type BaseDbHandler struct {
// contains filtered or unexported fields
}
BaseDbHandler 基础数据库执行句柄封装
func NewBaseDbHandler ¶
func NewBaseDbHandler(newExecer func(name string, conf *config.JSON) (Execer, error), opts *sql.TxOptions) *BaseDbHandler
NewBaseDbHandler 通过获取执行器函数newExecer和数据库事务执行选项opts创建数据库执行句柄封装
func (*BaseDbHandler) Config ¶
func (d *BaseDbHandler) Config(conf *config.JSON) (Config, error)
Config 通过配置获取关系型数据库配置
func (*BaseDbHandler) TableParam ¶
func (d *BaseDbHandler) TableParam(config Config, execer Execer) database.Parameter
TableParam 通过关系型数据库配置和执行器获取表参数
type BatchWriter ¶
type BatchWriter interface { JobID() int64 //工作编号 TaskGroupID() int64 //任务组编号 TaskID() int64 //任务编号 BatchSize() int //单次批量写入数 BatchTimeout() time.Duration //单次批量写入超时时间 BatchWrite(ctx context.Context, records []element.Record) error //批量写入 }
BatchWriter 批量写入器
type Config ¶
type Config interface { GetUsername() string //获取用户名 GetPassword() string //获取密码 GetURL() string //获取连接url GetColumns() []rdbmreader.Column //获取列信息 GetBaseTable() *database.BaseTable //获取表信息 GetWriteMode() string //获取写入模式 GetBatchSize() int //单次批量写入数 GetBatchTimeout() time.Duration //单次批量写入超时时间 GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy, error) //获取重试策略 IgnoreOneByOneError() bool //忽略一个个重试的错误 GetPreSQL() []string //获取准备的SQL语句 GetPostSQL() []string //获取结束的SQL语句 }
Config 关系数据库写入器配置
type DbHandler ¶
type DbHandler interface { Execer(name string, conf *config.JSON) (Execer, error) //通过数据库名name和配置获取执行器 Config(conf *config.JSON) (Config, error) //通过配置获取关系型数据库配置 TableParam(config Config, execer Execer) database.Parameter //通过关系型数据库配置和执行器获取表参数 }
DbHandler 数据库执行句柄封装
type Execer ¶
type Execer interface { //通过基础表信息获取具体表 Table(*database.BaseTable) database.Table //检测连通性 PingContext(ctx context.Context) error //通过query查询语句进行查询 QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) //通过query查询语句进行查询 ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) //通过参数param获取具体表 FetchTableWithParam(ctx context.Context, param database.Parameter) (database.Table, error) //批量执行 BatchExec(ctx context.Context, opts *database.ParameterOptions) (err error) //prepare/exec批量执行 BatchExecStmt(ctx context.Context, opts *database.ParameterOptions) (err error) //事务批量执行 BatchExecWithTx(ctx context.Context, opts *database.ParameterOptions) (err error) //事务prepare/exec批量执行 BatchExecStmtWithTx(ctx context.Context, opts *database.ParameterOptions) (err error) //关闭 Close() error }
Execer 执行器
type Job ¶
type Job struct { *plugin.BaseJob Handler DbHandler //数据库句柄 Execer Execer //执行器 // contains filtered or unexported fields }
Job 工作
Click to show internal directories.
Click to hide internal directories.