Documentation ¶
Index ¶
- func StartRead(ctx context.Context, reader BatchReader, sender plugin.RecordSender) (err error)
- type BaseBatchReader
- func (b *BaseBatchReader) JobID() int64
- func (b *BaseBatchReader) Parameter() database.Parameter
- func (b *BaseBatchReader) Read(ctx context.Context, param database.Parameter, handler database.FetchHandler) (err error)
- func (b *BaseBatchReader) TaskGroupID() int64
- func (b *BaseBatchReader) TaskID() int64
- type BaseColumn
- type BaseConfig
- func (b *BaseConfig) GetBaseTable() *database.BaseTable
- func (b *BaseConfig) GetColumns() (columns []Column)
- func (b *BaseConfig) GetPassword() string
- func (b *BaseConfig) GetQuerySQL() []string
- func (b *BaseConfig) GetSplitConfig() SplitConfig
- func (b *BaseConfig) GetURL() string
- func (b *BaseConfig) GetUsername() string
- func (b *BaseConfig) GetWhere() string
- type BaseDbHandler
- func (d *BaseDbHandler) Config(conf *config.JSON) (Config, error)
- func (d *BaseDbHandler) MaxParam(config Config, table database.Table) database.Parameter
- func (d *BaseDbHandler) MinParam(config Config, table database.Table) database.Parameter
- func (d *BaseDbHandler) Querier(name string, conf *config.JSON) (Querier, error)
- func (d *BaseDbHandler) SplitParam(config Config, querier Querier) database.Parameter
- func (d *BaseDbHandler) TableParam(config Config, querier Querier) database.Parameter
- type BatchReader
- type Column
- type Config
- type ConnConfig
- type DbHandler
- type Job
- type MaxParam
- type MinParam
- type Querier
- type QueryParam
- type SplitConfig
- type SplitParam
- type SplitRange
- type TableConfig
- type TableParam
- type TableParamConfig
- type TableParamTable
- type Task
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func StartRead ¶
func StartRead(ctx context.Context, reader BatchReader, sender plugin.RecordSender) (err error)
StartRead 开始读
Types ¶
type BaseBatchReader ¶
type BaseBatchReader struct {
// contains filtered or unexported fields
}
BaseBatchReader 基础批量读入器
func NewBaseBatchReader ¶
func NewBaseBatchReader(task *Task, mode string, opts *sql.TxOptions) *BaseBatchReader
NewBaseBatchReader 通过任务task,查询模式mode和事务选项opts获取基础批量读入器
func (*BaseBatchReader) Parameter ¶
func (b *BaseBatchReader) Parameter() database.Parameter
Parameter 查询参数
func (*BaseBatchReader) Read ¶
func (b *BaseBatchReader) Read(ctx context.Context, param database.Parameter, handler database.FetchHandler) (err error)
通过上下文ctx,查询阐述和数据库句柄handler查询
func (*BaseBatchReader) TaskGroupID ¶
func (b *BaseBatchReader) TaskGroupID() int64
TaskGroupID 任务组编号
type BaseConfig ¶
type BaseConfig struct { Username string `json:"username"` //用户名 Password string `json:"password"` //密码 Column []string `json:"column"` //列信息 Connection ConnConfig `json:"connection"` //连接信息 Where string `json:"where"` //查询条件 Split SplitConfig `json:"split"` //切分键 QuerySQL []string `json:"querySql"` //查询sql }
BaseConfig 基础关系型数据读入器配置
func NewBaseConfig ¶
func NewBaseConfig(conf *config.JSON) (c *BaseConfig, err error)
NewBaseConfig 通过json配置conf获取基础关系型数据读入器配置
func (*BaseConfig) GetBaseTable ¶
func (b *BaseConfig) GetBaseTable() *database.BaseTable
GetBaseTable 获取表信息
func (*BaseConfig) GetColumns ¶
func (b *BaseConfig) GetColumns() (columns []Column)
GetColumns 获取列信息
func (*BaseConfig) GetQuerySQL ¶ added in v0.1.4
func (b *BaseConfig) GetQuerySQL() []string
GetQuerySQL 获取查询sql
func (*BaseConfig) GetSplitConfig ¶
func (b *BaseConfig) GetSplitConfig() SplitConfig
GetSplitConfig 获取切分配置
type BaseDbHandler ¶
type BaseDbHandler struct {
// contains filtered or unexported fields
}
BaseDbHandler 基础数据库句柄
func NewBaseDbHandler ¶
func NewBaseDbHandler(newQuerier func(name string, conf *config.JSON) (Querier, error), opts *sql.TxOptions) *BaseDbHandler
NewBaseDbHandler 通过获取查询器函数newQuerier和事务选项opts获取基础数据库句柄
func (*BaseDbHandler) Config ¶
func (d *BaseDbHandler) Config(conf *config.JSON) (Config, error)
Config 通过json配置conf获取关系型数据库输入配置
func (*BaseDbHandler) SplitParam ¶
func (d *BaseDbHandler) SplitParam(config Config, querier Querier) database.Parameter
SplitParam 通过关系型数据库输入配置config和表Table获取切分最小值参数
func (*BaseDbHandler) TableParam ¶
func (d *BaseDbHandler) TableParam(config Config, querier Querier) database.Parameter
TableParam 通过关系型数据库输入配置config和查询器querier获取表参数
type BatchReader ¶
type BatchReader interface { JobID() int64 //工作编号 TaskGroupID() int64 //任务组编号 TaskID() int64 //任务编号 Read(ctx context.Context, param database.Parameter, handler database.FetchHandler) (err error) //通过上下文ctx,查询阐述和数据库句柄handler查询· Parameter() database.Parameter //查询参数 }
BatchReader 批量读入器
type Config ¶
type Config interface { GetUsername() string //获取用户名 GetPassword() string //获取密码 GetURL() string //获取连接url GetColumns() []Column //获取列信息 GetBaseTable() *database.BaseTable //获取表信息 GetWhere() string //获取查询条件 GetSplitConfig() SplitConfig //获取切分配置 GetQuerySQL() []string //获取查询sql }
Config 关系型数据读入器配置
type ConnConfig ¶
type ConnConfig struct { URL string `json:"url"` //连接数据库 Table TableConfig `json:"table"` //表配置 }
ConnConfig 连接配置
type DbHandler ¶
type DbHandler interface { Querier(name string, conf *config.JSON) (Querier, error) //通过数据库名name和json配置conf获取查询器 Config(conf *config.JSON) (Config, error) //通过json配置conf获取关系型数据库输入配置 TableParam(config Config, querier Querier) database.Parameter //通过关系型数据库输入配置config和查询器querier获取表参数 SplitParam(config Config, querier Querier) database.Parameter //通过关系型数据库输入配置config和查询器querier获取切分表参数 MinParam(config Config, table database.Table) database.Parameter //通过关系型数据库输入配置config和表Table获取切分最小值参数 MaxParam(config Config, table database.Table) database.Parameter //通过关系型数据库输入配置config和表询器Table获取切分最大值参数 }
DbHandler 数据库句柄
type Job ¶
type Job struct { *plugin.BaseJob Querier Querier Config Config // contains filtered or unexported fields }
Job 工作
type MaxParam ¶
MaxParam 最大值参数
func NewMaxParam ¶
NewMaxParam 通过关系型数据库输入配置config,对应数据库表table和事务选项opts获取查询参数
type MinParam ¶
MinParam 最小值参数
func NewMinParam ¶
NewMinParam 通过关系型数据库输入配置config,对应数据库表table和事务选项opts获取最小值参数
type Querier ¶
type Querier interface { //通过基础表信息获取具体表 Table(*database.BaseTable) database.Table //检测连通性 PingContext(ctx context.Context) error //通过query查询语句进行查询 QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) //通过参数param获取具体表 FetchTableWithParam(ctx context.Context, param database.Parameter) (database.Table, error) //通过参数param,处理句柄handler获取记录 FetchRecord(ctx context.Context, param database.Parameter, handler database.FetchHandler) (err error) //通过参数param,处理句柄handler使用事务获取记录 FetchRecordWithTx(ctx context.Context, param database.Parameter, handler database.FetchHandler) (err error) //关闭资源 Close() error }
Querier 查询器
type QueryParam ¶
QueryParam 查询参数
func NewQueryParam ¶
NewQueryParam 通过关系型数据库输入配置config,对应数据库表table和事务选项opts获取查询参数
type SplitConfig ¶
type SplitConfig struct { Key string `json:"key"` //切分键 //day(日),min(分钟),s(秒),ms(毫秒),us(微秒),ns(纳秒) TimeAccuracy string `json:"timeAccuracy"` //切分时间精度(默认为day) Range SplitRange `json:"range"` //切分范围 }
SplitConfig 切分配置
type SplitParam ¶
SplitParam 切分参数
func NewSplitParam ¶
func NewSplitParam(config Config, table TableParamTable, opts *sql.TxOptions) *SplitParam
NewSplitParam 获取表参数配置config,通过表参数获取对应数据库的表table和事务选项opts获取切分表参数
type SplitRange ¶
type SplitRange struct { Type string `json:"type"` //类型 bigint, string, time Layout string `json:"layout"` //时间格式 Left string `json:"left"` //开始点 Right string `json:"right"` //结束点 // contains filtered or unexported fields }
SplitRange 切分范围配置
type TableConfig ¶
type TableConfig struct { Db string `json:"db"` //库 Schema string `json:"schema"` //模式 Name string `json:"name"` //表名 }
TableConfig 表配置
type TableParam ¶
type TableParam struct { *database.BaseParam Config TableParamConfig }
TableParam 表参数
func NewTableParam ¶
func NewTableParam(config TableParamConfig, table TableParamTable, opts *sql.TxOptions) *TableParam
NewTableParam 获取表参数配置config,通过表参数获取对应数据库的表table和事务选项opts获取表参数
type TableParamConfig ¶
type TableParamConfig interface { GetColumns() []Column //获取列信息 GetBaseTable() *database.BaseTable //获取表信息 }
TableParamConfig 表参数配置
type TableParamTable ¶
TableParamTable 通过表参数获取对应数据库的表