Documentation ¶
Index ¶
- Constants
- Variables
- func FormatDate(timeStr string) time.Time
- func ParseSQLValueByType(typ DataType, str string) (interface{}, error)
- type CanalMysql
- func (mysql *CanalMysql) GetCacheMap() *ValueMap
- func (mysql *CanalMysql) GetCategory() string
- func (mysql *CanalMysql) GetExistsKeys() []int8
- func (mysql *CanalMysql) GetOp() string
- func (mysql *CanalMysql) GetValues() []interface{}
- func (mysql *CanalMysql) ParseToMap(table *SQLTable) (ValueMap, error)
- func (mysql *CanalMysql) SetCacheMap(m *ValueMap)
- func (mysql *CanalMysql) SetExistsKeys(val []int8)
- func (mysql *CanalMysql) SetOp(s string)
- func (mysql *CanalMysql) SetValues(val []interface{})
- func (mysql *CanalMysql) UnmarshalFromByte(b []byte, pool *MapPool) error
- func (mysql *CanalMysql) UnmarshalFromStr(str string, pool *MapPool) error
- func (mysql *CanalMysql) Unpack() []DataInterface
- type ConnectorMongo
- func (mongo *ConnectorMongo) GetCacheMap() *ValueMap
- func (mongo *ConnectorMongo) GetCategory() string
- func (mongo *ConnectorMongo) GetExistsKeys() []int8
- func (mongo *ConnectorMongo) GetOp() string
- func (mongo *ConnectorMongo) GetValues() []interface{}
- func (mongo *ConnectorMongo) ParseToMap(table *SQLTable) (ValueMap, error)
- func (mongo *ConnectorMongo) SetCacheMap(m *ValueMap)
- func (mongo *ConnectorMongo) SetExistsKeys(val []int8)
- func (mongo *ConnectorMongo) SetOp(s string)
- func (mongo *ConnectorMongo) SetValues(val []interface{})
- func (mongo *ConnectorMongo) UnmarshalFromByte(b []byte, mp *MapPool) error
- func (mongo *ConnectorMongo) UnmarshalFromStr(str string, mp *MapPool) error
- func (mongo *ConnectorMongo) Unpack() []DataInterface
- type Data
- type DataInterface
- type DataType
- type DebeziumMongo
- func (mongo *DebeziumMongo) GetCacheMap() *ValueMap
- func (mongo *DebeziumMongo) GetCategory() string
- func (mongo *DebeziumMongo) GetExistsKeys() []int8
- func (mongo *DebeziumMongo) GetOp() string
- func (mongo *DebeziumMongo) GetValues() []interface{}
- func (mongo *DebeziumMongo) ParseToMap(table *SQLTable) (ValueMap, error)
- func (mongo *DebeziumMongo) SetCacheMap(m *ValueMap)
- func (mongo *DebeziumMongo) SetExistsKeys(val []int8)
- func (mongo *DebeziumMongo) SetOp(s string)
- func (mongo *DebeziumMongo) SetValues(val []interface{})
- func (mongo *DebeziumMongo) UnmarshalFromByte(b []byte, mp *MapPool) error
- func (mongo *DebeziumMongo) UnmarshalFromStr(str string, mp *MapPool) error
- func (mongo *DebeziumMongo) Unpack() []DataInterface
- type DebeziumMySQL
- func (mysql *DebeziumMySQL) GetCacheMap() *ValueMap
- func (mysql *DebeziumMySQL) GetCategory() string
- func (mysql *DebeziumMySQL) GetExistsKeys() []int8
- func (mysql *DebeziumMySQL) GetOp() string
- func (mysql *DebeziumMySQL) GetValues() []interface{}
- func (mysql *DebeziumMySQL) ParseToMap(table *SQLTable) (ValueMap, error)
- func (mysql *DebeziumMySQL) SetCacheMap(m *ValueMap)
- func (mysql *DebeziumMySQL) SetExistsKeys(val []int8)
- func (mysql *DebeziumMySQL) SetOp(s string)
- func (mysql *DebeziumMySQL) SetValues(val []interface{})
- func (mysql *DebeziumMySQL) UnmarshalFromByte(b []byte, mp *MapPool) error
- func (mysql *DebeziumMySQL) UnmarshalFromStr(str string, mp *MapPool) error
- func (mysql *DebeziumMySQL) Unpack() []DataInterface
- type MapPool
- type SQLTable
- type ValueMap
Constants ¶
const ( DBZUMMYSQL = "debezium-mysql" DBZUMMONGO = "debezium-mongodb" CANALMYSQL = "canal-mysql" CONNMONGO = "connector-mongodb" )
const ( DataTypeString = iota DataTypeInt DataTypeFloat DataTypeTime )
supported data type
const ( SQLDateCanalTimeLayout = "2006-01-02 15:04:05" MySQLTimeLayout = "2006-01-02T15:04:05Z" DateTimeLayout = "2006-01-02" )
SQLDateTimeLayout SQLDateTimeLayout
Variables ¶
var ( // ErrAction 错误的类型信息 ErrAction = errors.New("not except action") // ErrEmptyPayload ErrEmptyPayload = errors.New("payload is empty") )
var NullValMap = map[DataType]interface{}{ DataTypeString: "", DataTypeInt: 0, DataTypeFloat: 0.0, DataTypeTime: time.Unix(0, 0), }
NullValMap 记录该类型的默认值
Functions ¶
func FormatDate ¶
func ParseSQLValueByType ¶
ParseSQLValueByType 按类型
Types ¶
type CanalMysql ¶
type CanalMysql struct { Before []ValueMap `json:"old"` Cache []originJson.RawMessage `json:"data"` After []ValueMap Op string `json:"type"` // contains filtered or unexported fields }
CanalMysql 新的类型
func (*CanalMysql) GetCacheMap ¶
func (mysql *CanalMysql) GetCacheMap() *ValueMap
GetCacheMap 获取解析好的map数据
func (*CanalMysql) GetCategory ¶
func (mysql *CanalMysql) GetCategory() string
GetCategory 获取类型,是mysql还是mongo
func (*CanalMysql) GetExistsKeys ¶
func (mysql *CanalMysql) GetExistsKeys() []int8
GetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新, 0 不存在,1 存在
func (*CanalMysql) GetValues ¶
func (mysql *CanalMysql) GetValues() []interface{}
GetValues 用于获取 准备好插入db的[]interface{}
func (*CanalMysql) ParseToMap ¶
func (mysql *CanalMysql) ParseToMap(table *SQLTable) (ValueMap, error)
ParseToMap 将json解析成map
func (*CanalMysql) SetCacheMap ¶
func (mysql *CanalMysql) SetCacheMap(m *ValueMap)
SetCacheMap 将json解析成map并放进来
func (*CanalMysql) SetExistsKeys ¶
func (mysql *CanalMysql) SetExistsKeys(val []int8)
SetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新
func (*CanalMysql) SetValues ¶
func (mysql *CanalMysql) SetValues(val []interface{})
SetValues 存放将插入db的 []interface{}
func (*CanalMysql) UnmarshalFromByte ¶
func (mysql *CanalMysql) UnmarshalFromByte(b []byte, pool *MapPool) error
UnmarshalFromStr UnmarshalFromStr
func (*CanalMysql) UnmarshalFromStr ¶
func (mysql *CanalMysql) UnmarshalFromStr(str string, pool *MapPool) error
UnmarshalFromStr UnmarshalFromStr
func (*CanalMysql) Unpack ¶
func (mysql *CanalMysql) Unpack() []DataInterface
Unpack 展开,防止 canal 用 batch 方式提交
type ConnectorMongo ¶
type ConnectorMongo struct { Payload ValueMap `json:"fullDocument"` DocumentKey ValueMap `json:"documentKey"` UpdateDescription struct { UpdatedFields ValueMap `json:"updatedFields"` RemovedFields []string `json:"removedFields"` } `json:"updateDescription"` OperationType string `json:"operationType"` FullDocument ValueMap // contains filtered or unexported fields }
ConnectorMongo mongo type of connector
func (*ConnectorMongo) GetCacheMap ¶
func (mongo *ConnectorMongo) GetCacheMap() *ValueMap
GetCacheMap 获取解析好的map数据
func (*ConnectorMongo) GetCategory ¶
func (mongo *ConnectorMongo) GetCategory() string
GetCategory 获取类型,是mysql还是mongo
func (*ConnectorMongo) GetExistsKeys ¶
func (mongo *ConnectorMongo) GetExistsKeys() []int8
GetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新, 0 不存在,1 存在
func (*ConnectorMongo) GetValues ¶
func (mongo *ConnectorMongo) GetValues() []interface{}
GetValues 用于获取 准备好插入db的[]interface{}
func (*ConnectorMongo) ParseToMap ¶
func (mongo *ConnectorMongo) ParseToMap(table *SQLTable) (ValueMap, error)
ParseToMap 将json解析成map
func (*ConnectorMongo) SetCacheMap ¶
func (mongo *ConnectorMongo) SetCacheMap(m *ValueMap)
SetCacheMap 将json解析成map并放进来
func (*ConnectorMongo) SetExistsKeys ¶
func (mongo *ConnectorMongo) SetExistsKeys(val []int8)
SetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新
func (*ConnectorMongo) SetOp ¶
func (mongo *ConnectorMongo) SetOp(s string)
SetOp 写入opLog/binLog 的类型
func (*ConnectorMongo) SetValues ¶
func (mongo *ConnectorMongo) SetValues(val []interface{})
SetValues 存放将插入db的 []interface{}
func (*ConnectorMongo) UnmarshalFromByte ¶
func (mongo *ConnectorMongo) UnmarshalFromByte(b []byte, mp *MapPool) error
UnmarshalFromByte UnmarshalFromStr
func (*ConnectorMongo) UnmarshalFromStr ¶
func (mongo *ConnectorMongo) UnmarshalFromStr(str string, mp *MapPool) error
UnmarshalFromStr UnmarshalFromStr
type Data ¶
type Data struct { Keys []string Values []interface{} // 不用map是因为数据量少的情况下,slice性能更高 CheckPoint, Operation string }
Data 关系型数据的一条记录
type DataInterface ¶
type DataInterface interface { GetCategory() string GetOp() string SetOp(s string) SetValues(val []interface{}) GetValues() []interface{} ParseToMap(table *SQLTable) (ValueMap, error) SetExistsKeys([]int8) GetExistsKeys() []int8 SetCacheMap(m *ValueMap) GetCacheMap() *ValueMap UnmarshalFromStr(str string, mappool *MapPool) error UnmarshalFromByte(b []byte, mappool *MapPool) error Unpack() []DataInterface }
DataInterface mysql 和 mongo 的封装
type DataType ¶
type DataType int
DataType 用于Go内部转换的数据类型
func ParseTypeByCkType ¶
ParseTypeByCkType 解析类型到ch的类型
func ParseTypeByMysqlType ¶
ParseTypeByMysqlType 将MySQL的数据类型转换为Go语言内部转换用的DataType
type DebeziumMongo ¶
type DebeziumMongo struct { Payload *debeziumMongoPayload `json:"payload"` // contains filtered or unexported fields }
DebeziumMongo 类型
func (*DebeziumMongo) GetCacheMap ¶
func (mongo *DebeziumMongo) GetCacheMap() *ValueMap
GetCacheMap 获取解析好的map数据
func (*DebeziumMongo) GetCategory ¶
func (mongo *DebeziumMongo) GetCategory() string
GetCategory 获取类型,是mysql还是mongo
func (*DebeziumMongo) GetExistsKeys ¶
func (mongo *DebeziumMongo) GetExistsKeys() []int8
GetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新, 0 不存在,1 存在
func (*DebeziumMongo) GetOp ¶
func (mongo *DebeziumMongo) GetOp() string
GetOp 获取 opLog/binLog 的操作类型
func (*DebeziumMongo) GetValues ¶
func (mongo *DebeziumMongo) GetValues() []interface{}
GetValues 用于获取 准备好插入db的[]interface{}
func (*DebeziumMongo) ParseToMap ¶
func (mongo *DebeziumMongo) ParseToMap(table *SQLTable) (ValueMap, error)
ParseToMap 将json解析成map
func (*DebeziumMongo) SetCacheMap ¶
func (mongo *DebeziumMongo) SetCacheMap(m *ValueMap)
SetCacheMap 将json解析成map并放进来
func (*DebeziumMongo) SetExistsKeys ¶
func (mongo *DebeziumMongo) SetExistsKeys(val []int8)
SetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新
func (*DebeziumMongo) SetValues ¶
func (mongo *DebeziumMongo) SetValues(val []interface{})
SetValues 存放将插入db的 []interface{}
func (*DebeziumMongo) UnmarshalFromByte ¶
func (mongo *DebeziumMongo) UnmarshalFromByte(b []byte, mp *MapPool) error
UnmarshalFromByte UnmarshalFromStr
func (*DebeziumMongo) UnmarshalFromStr ¶
func (mongo *DebeziumMongo) UnmarshalFromStr(str string, mp *MapPool) error
UnmarshalFromStr UnmarshalFromStr
type DebeziumMySQL ¶
type DebeziumMySQL struct { Payload *debeziumMysqlPayload `json:"payload"` // contains filtered or unexported fields }
DebeziumMySQL 类型
func (*DebeziumMySQL) GetCacheMap ¶
func (mysql *DebeziumMySQL) GetCacheMap() *ValueMap
GetCacheMap 获取解析好的map数据
func (*DebeziumMySQL) GetCategory ¶
func (mysql *DebeziumMySQL) GetCategory() string
GetCategory 获取类型,是mysql还是mongo
func (*DebeziumMySQL) GetExistsKeys ¶
func (mysql *DebeziumMySQL) GetExistsKeys() []int8
GetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新, 0 不存在,1 存在
func (*DebeziumMySQL) GetOp ¶
func (mysql *DebeziumMySQL) GetOp() string
GetOp 获取 opLog/binLog 的操作类型
func (*DebeziumMySQL) GetValues ¶
func (mysql *DebeziumMySQL) GetValues() []interface{}
GetValues 用于获取 准备好插入db的[]interface{}
func (*DebeziumMySQL) ParseToMap ¶
func (mysql *DebeziumMySQL) ParseToMap(table *SQLTable) (ValueMap, error)
ParseToMap 将json解析成map
func (*DebeziumMySQL) SetCacheMap ¶
func (mysql *DebeziumMySQL) SetCacheMap(m *ValueMap)
SetCacheMap 将json解析成map并放进来
func (*DebeziumMySQL) SetExistsKeys ¶
func (mysql *DebeziumMySQL) SetExistsKeys(val []int8)
SetExistsKeys 返回用于标记该字段是否存在,主要用于Mongo更新
func (*DebeziumMySQL) SetValues ¶
func (mysql *DebeziumMySQL) SetValues(val []interface{})
SetValues 存放将插入db的 []interface{}
func (*DebeziumMySQL) UnmarshalFromByte ¶
func (mysql *DebeziumMySQL) UnmarshalFromByte(b []byte, mp *MapPool) error
UnmarshalFromByte UnmarshalFromStr
func (*DebeziumMySQL) UnmarshalFromStr ¶
func (mysql *DebeziumMySQL) UnmarshalFromStr(str string, mp *MapPool) error
UnmarshalFromStr UnmarshalFromStr
type SQLTable ¶
type SQLTable struct { DbName string Table string Types map[string]DataType // 列名和类型的映射 PrimaryKey string Columns []string // 列信息(有序) ColumnsDefaultValue []interface{} // 用作nil填充 PrimaryKeyIndex int InsertSQL string QuerySQL string QueryNode *sql.DB }
SQLTable 记录 clickhouse table 各种信息