dbms

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func StartRead

func StartRead(ctx context.Context, reader BatchReader, sender plugin.RecordSender) (err error)

StartRead 开始读

Types

type BaseBatchReader

type BaseBatchReader struct {
	// contains filtered or unexported fields
}

BaseBatchReader 基础批量读入器

func NewBaseBatchReader

func NewBaseBatchReader(task *Task, mode string, opts *sql.TxOptions) *BaseBatchReader

NewBaseBatchReader 通过任务task,查询模式mode和事务选项opts获取基础批量读入器

func (*BaseBatchReader) JobID

func (b *BaseBatchReader) JobID() int64

JobID 工作编号

func (*BaseBatchReader) Parameter

func (b *BaseBatchReader) Parameter() database.Parameter

Parameter 查询参数

func (*BaseBatchReader) Read

func (b *BaseBatchReader) Read(ctx context.Context, param database.Parameter, handler database.FetchHandler) (err error)

通过上下文ctx,查询阐述和数据库句柄handler查询

func (*BaseBatchReader) TaskGroupID

func (b *BaseBatchReader) TaskGroupID() int64

TaskGroupID 任务组编号

func (*BaseBatchReader) TaskID

func (b *BaseBatchReader) TaskID() int64

TaskID 任务编号

type BaseColumn

type BaseColumn struct {
	Name string
}

BaseColumn 基础列信息

func (*BaseColumn) GetName

func (b *BaseColumn) GetName() string

GetName 获取列名

type BaseConfig

type BaseConfig struct {
	Username   string      `json:"username"`   //用户名
	Password   string      `json:"password"`   //密码
	Column     []string    `json:"column"`     //列信息
	Connection ConnConfig  `json:"connection"` //连接信息
	Where      string      `json:"where"`      //查询条件
	Split      SplitConfig `json:"split"`      //切分键
	QuerySQL   []string    `json:"querySql"`   //查询sql
}

BaseConfig 基础关系型数据读入器配置

func NewBaseConfig

func NewBaseConfig(conf *config.JSON) (c *BaseConfig, err error)

NewBaseConfig 通过json配置conf获取基础关系型数据读入器配置

func (*BaseConfig) GetBaseTable

func (b *BaseConfig) GetBaseTable() *database.BaseTable

GetBaseTable 获取表信息

func (*BaseConfig) GetColumns

func (b *BaseConfig) GetColumns() (columns []Column)

GetColumns 获取列信息

func (*BaseConfig) GetPassword

func (b *BaseConfig) GetPassword() string

GetPassword 获取密码

func (*BaseConfig) GetQuerySQL added in v0.1.4

func (b *BaseConfig) GetQuerySQL() []string

GetQuerySQL 获取查询sql

func (*BaseConfig) GetSplitConfig

func (b *BaseConfig) GetSplitConfig() SplitConfig

GetSplitConfig 获取切分配置

func (*BaseConfig) GetURL

func (b *BaseConfig) GetURL() string

GetURL 获取关系型数据库连接url

func (*BaseConfig) GetUsername

func (b *BaseConfig) GetUsername() string

GetUsername 获取用户名

func (*BaseConfig) GetWhere

func (b *BaseConfig) GetWhere() string

GetWhere 获取查询条件

type BaseDbHandler

type BaseDbHandler struct {
	// contains filtered or unexported fields
}

BaseDbHandler 基础数据库句柄

func NewBaseDbHandler

func NewBaseDbHandler(newQuerier func(name string, conf *config.JSON) (Querier, error), opts *sql.TxOptions) *BaseDbHandler

NewBaseDbHandler 通过获取查询器函数newQuerier和事务选项opts获取基础数据库句柄

func (*BaseDbHandler) Config

func (d *BaseDbHandler) Config(conf *config.JSON) (Config, error)

Config 通过json配置conf获取关系型数据库输入配置

func (*BaseDbHandler) MaxParam

func (d *BaseDbHandler) MaxParam(config Config, table database.Table) database.Parameter

MaxParam 通过关系型数据库输入配置config和表询器Table获取切分最大值参数

func (*BaseDbHandler) MinParam

func (d *BaseDbHandler) MinParam(config Config, table database.Table) database.Parameter

MinParam 通过关系型数据库输入配置config和查询器querier获取切分表参数

func (*BaseDbHandler) Querier

func (d *BaseDbHandler) Querier(name string, conf *config.JSON) (Querier, error)

Querier 通过数据库名name和json配置conf获取查询器

func (*BaseDbHandler) SplitParam

func (d *BaseDbHandler) SplitParam(config Config, querier Querier) database.Parameter

SplitParam 通过关系型数据库输入配置config和表Table获取切分最小值参数

func (*BaseDbHandler) TableParam

func (d *BaseDbHandler) TableParam(config Config, querier Querier) database.Parameter

TableParam 通过关系型数据库输入配置config和查询器querier获取表参数

type BatchReader

type BatchReader interface {
	JobID() int64       //工作编号
	TaskGroupID() int64 //任务组编号
	TaskID() int64      //任务编号
	Read(ctx context.Context, param database.Parameter,
		handler database.FetchHandler) (err error) //通过上下文ctx,查询阐述和数据库句柄handler查询·
	Parameter() database.Parameter //查询参数
}

BatchReader 批量读入器

type Column

type Column interface {
	GetName() string //获取表名
}

Column 列信息

type Config

type Config interface {
	GetUsername() string               //获取用户名
	GetPassword() string               //获取密码
	GetURL() string                    //获取连接url
	GetColumns() []Column              //获取列信息
	GetBaseTable() *database.BaseTable //获取表信息
	GetWhere() string                  //获取查询条件
	GetSplitConfig() SplitConfig       //获取切分配置
	GetQuerySQL() []string             //获取查询sql
}

Config 关系型数据读入器配置

type ConnConfig

type ConnConfig struct {
	URL   string      `json:"url"`   //连接数据库
	Table TableConfig `json:"table"` //表配置
}

ConnConfig 连接配置

type DbHandler

type DbHandler interface {
	Querier(name string, conf *config.JSON) (Querier, error)         //通过数据库名name和json配置conf获取查询器
	Config(conf *config.JSON) (Config, error)                        //通过json配置conf获取关系型数据库输入配置
	TableParam(config Config, querier Querier) database.Parameter    //通过关系型数据库输入配置config和查询器querier获取表参数
	SplitParam(config Config, querier Querier) database.Parameter    //通过关系型数据库输入配置config和查询器querier获取切分表参数
	MinParam(config Config, table database.Table) database.Parameter //通过关系型数据库输入配置config和表Table获取切分最小值参数
	MaxParam(config Config, table database.Table) database.Parameter //通过关系型数据库输入配置config和表询器Table获取切分最大值参数
}

DbHandler 数据库句柄

type Job

type Job struct {
	*plugin.BaseJob

	Querier Querier
	Config  Config
	// contains filtered or unexported fields
}

Job 工作

func NewJob

func NewJob(handler DbHandler) *Job

NewJob 通过数据库句柄handler获取工作

func (*Job) Destroy

func (j *Job) Destroy(ctx context.Context) (err error)

Destroy 销毁

func (*Job) Init

func (j *Job) Init(ctx context.Context) (err error)

Init 初始化

func (*Job) Split

func (j *Job) Split(ctx context.Context, number int) (configs []*config.JSON, err error)

Split 切分

type MaxParam

type MaxParam struct {
	*database.BaseParam

	Config Config
}

MaxParam 最大值参数

func NewMaxParam

func NewMaxParam(config Config, table database.Table, opts *sql.TxOptions) *MaxParam

NewMaxParam 通过关系型数据库输入配置config,对应数据库表table和事务选项opts获取查询参数

func (*MaxParam) Agrs

func (m *MaxParam) Agrs(_ []element.Record) ([]interface{}, error)

Agrs 获取查询参数

func (*MaxParam) Query

func (m *MaxParam) Query(_ []element.Record) (string, error)

Query 获取查询语句

type MinParam

type MinParam struct {
	*database.BaseParam

	Config Config
}

MinParam 最小值参数

func NewMinParam

func NewMinParam(config Config, table database.Table, opts *sql.TxOptions) *MinParam

NewMinParam 通过关系型数据库输入配置config,对应数据库表table和事务选项opts获取最小值参数

func (*MinParam) Agrs

func (m *MinParam) Agrs(_ []element.Record) ([]interface{}, error)

Agrs 获取查询参数

func (*MinParam) Query

func (m *MinParam) Query(_ []element.Record) (string, error)

Query 获取查询语句

type Querier

type Querier interface {
	//通过基础表信息获取具体表
	Table(*database.BaseTable) database.Table
	//检测连通性
	PingContext(ctx context.Context) error
	//通过query查询语句进行查询
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
	//通过参数param获取具体表
	FetchTableWithParam(ctx context.Context, param database.Parameter) (database.Table, error)
	//通过参数param,处理句柄handler获取记录
	FetchRecord(ctx context.Context, param database.Parameter, handler database.FetchHandler) (err error)
	//通过参数param,处理句柄handler使用事务获取记录
	FetchRecordWithTx(ctx context.Context, param database.Parameter, handler database.FetchHandler) (err error)
	//关闭资源
	Close() error
}

Querier 查询器

type QueryParam

type QueryParam struct {
	*database.BaseParam

	Config Config
}

QueryParam 查询参数

func NewQueryParam

func NewQueryParam(config Config, table database.Table, opts *sql.TxOptions) *QueryParam

NewQueryParam 通过关系型数据库输入配置config,对应数据库表table和事务选项opts获取查询参数

func (*QueryParam) Agrs

func (q *QueryParam) Agrs(_ []element.Record) (a []interface{}, err error)

Agrs 获取查询参数

func (*QueryParam) Query

func (q *QueryParam) Query(_ []element.Record) (string, error)

Query 获取查询语句

type SplitConfig

type SplitConfig struct {
	Key string `json:"key"` //切分键
	//day(日),min(分钟),s(秒),ms(毫秒),us(微秒),ns(纳秒)
	TimeAccuracy string     `json:"timeAccuracy"` //切分时间精度(默认为day)
	Range        SplitRange `json:"range"`        //切分范围
}

SplitConfig 切分配置

type SplitParam

type SplitParam struct {
	*database.BaseParam

	Config Config
}

SplitParam 切分参数

func NewSplitParam

func NewSplitParam(config Config, table TableParamTable, opts *sql.TxOptions) *SplitParam

NewSplitParam 获取表参数配置config,通过表参数获取对应数据库的表table和事务选项opts获取切分表参数

func (*SplitParam) Agrs

func (s *SplitParam) Agrs(_ []element.Record) ([]interface{}, error)

Agrs 获取查询参数

func (*SplitParam) Query

func (s *SplitParam) Query(_ []element.Record) (string, error)

Query 获取查询语句

type SplitRange

type SplitRange struct {
	Type   string `json:"type"`   //类型 bigint, string, time
	Layout string `json:"layout"` //时间格式
	Left   string `json:"left"`   //开始点
	Right  string `json:"right"`  //结束点
	// contains filtered or unexported fields
}

SplitRange 切分范围配置

type TableConfig

type TableConfig struct {
	Db     string `json:"db"`     //库
	Schema string `json:"schema"` //模式
	Name   string `json:"name"`   //表名
}

TableConfig 表配置

type TableParam

type TableParam struct {
	*database.BaseParam

	Config TableParamConfig
}

TableParam 表参数

func NewTableParam

func NewTableParam(config TableParamConfig, table TableParamTable, opts *sql.TxOptions) *TableParam

NewTableParam 获取表参数配置config,通过表参数获取对应数据库的表table和事务选项opts获取表参数

func (*TableParam) Agrs

func (t *TableParam) Agrs(_ []element.Record) ([]interface{}, error)

Agrs 获取查询参数

func (*TableParam) Query

func (t *TableParam) Query(_ []element.Record) (string, error)

Query 获取查询语句

type TableParamConfig

type TableParamConfig interface {
	GetColumns() []Column              //获取列信息
	GetBaseTable() *database.BaseTable //获取表信息
}

TableParamConfig 表参数配置

type TableParamTable

type TableParamTable interface {
	Table(*database.BaseTable) database.Table //通过表参数获取对应数据库的表
}

TableParamTable 通过表参数获取对应数据库的表

type Task

type Task struct {
	*plugin.BaseTask

	Querier Querier
	Config  Config
	Table   database.Table
	// contains filtered or unexported fields
}

Task 任务

func NewTask

func NewTask(handler DbHandler) *Task

NewTask 通过数据库句柄handler获取任务

func (*Task) Destroy

func (t *Task) Destroy(ctx context.Context) (err error)

Destroy 销毁

func (*Task) Init

func (t *Task) Init(ctx context.Context) (err error)

Init 初始化

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL