Documentation ¶
Overview ¶
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Variables
- func AddNewDB(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, ...) *db
- func AddTable(db, schema, tableName, IgnoreTable string, channelId int) error
- func AddTableToServer(db, schemaName, tableName string, ToServerInfo ToServer) error
- func Close()
- func CompareBinlogPositionAndReturnGreater(BinlogFileNum1 int, BinlogPosition1 uint32, BinlogFileNum2 int, ...) (int, uint32)
- func CompareBinlogPositionAndReturnLess(BinlogFileNum1 int, BinlogPosition1 uint32, BinlogFileNum2 int, ...) (int, uint32)
- func DelChannel(name string, channelID int) bool
- func DelDB(Name string) bool
- func DelTable(db, schema, tableName string) error
- func DoRecoveryByBackupData(fileContent string)
- func DoRecoverySnapshotData()
- func DoSaveSnapshotData()
- func GetDBObj(Name string) *db
- func GetFileQueue(dbName, SchemaName, tableName, ToServerID string) string
- func GetListDb() map[string]DbListStruct
- func GetSchemaAndTableBySplit(schemaAndTableName string) (schemaName, tableName string)
- func GetSchemaAndTableJoin(schema, tableName string) string
- func GetSnapshotData() ([]byte, error)
- func GetSnapshotData2() ([]byte, error)
- func InitStorage()
- func NewConsumeChannel(c *Channel) *consume_channel_obj
- func NewDb(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, ...) *db
- func NewDbByNull() *db
- func Recovery(content *json.RawMessage, isStop bool)
- func SaveDBConfigInfo()
- func SaveDBInfoToFileData() interface{}
- func StopAllChannel()
- func UpdateDB(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, ...) error
- func UpdateTable(db, schema, tableName, IgnoreTable string) error
- type Channel
- func (Channel *Channel) Close()
- func (Channel *Channel) GetChannel() chan mysql.EventReslut
- func (This *Channel) GetChannelMaxThreadNum() int
- func (Channel *Channel) GetCountChan() chan *count.FlowCount
- func (This *Channel) SetChannelMaxThreadNum(n int)
- func (Channel *Channel) SetFlowCountChan(flowChan chan *count.FlowCount)
- func (Channel *Channel) Start() chan mysql.EventReslut
- func (Channel *Channel) Stop()
- type DbListStruct
- type StatusFlag
- type Table
- type TmpPositioinStruct
- type ToServer
- func (This *ToServer) AddWaitError(WaitErr error, WaitData *pluginDriver.PluginDataType) bool
- func (This *ToServer) AppendToFileQueue(data *pluginDriver.PluginDataType) error
- func (This *ToServer) ConsumeToServer(db *db, SchemaName string, TableName string)
- func (This *ToServer) DealWaitError() bool
- func (This *ToServer) DelWaitError() bool
- func (This *ToServer) FileQueueStart() error
- func (This *ToServer) GetFileQueueInfo() (info filequeue.QueueInfo, err error)
- func (This *ToServer) GetWaitErrorDeal() int
- func (This *ToServer) InitFileQueue(dbName, SchemaName, tableName string) *ToServer
- func (This *ToServer) PopFileQueue() (*pluginDriver.PluginDataType, error)
- func (This *ToServer) ReadLastFromFileQueue() (*pluginDriver.PluginDataType, error)
- func (This *ToServer) SkipBinlog(MyConsumerId int, SkipErrData *pluginDriver.PluginDataType) (err error)
- func (This *ToServer) Start()
- func (This *ToServer) Stop()
- func (This *ToServer) UpdateBinlogPosition(BinlogFileNum int, BinlogPosition uint32) bool
- type ToServerChan
- type ToServerStatus
Constants ¶
This section is empty.
Variables ¶
var AllSchemaAndTablekey string = GetSchemaAndTableJoin("*", "*")
var DbList map[string]*db
var DbLock sync.Mutex
var TmpPositioin []*TmpPositioinStruct
Functions ¶
func AddTableToServer ¶
func DelChannel ¶
func DoRecoveryByBackupData ¶ added in v1.2.2
func DoRecoveryByBackupData(fileContent string)
func DoRecoverySnapshotData ¶ added in v1.2.2
func DoRecoverySnapshotData()
func DoSaveSnapshotData ¶ added in v1.2.2
func DoSaveSnapshotData()
func GetFileQueue ¶ added in v1.2.2
func GetListDb ¶
func GetListDb() map[string]DbListStruct
func GetSchemaAndTableJoin ¶
func GetSnapshotData ¶ added in v1.2.2
func InitStorage ¶
func InitStorage()
func NewConsumeChannel ¶
func NewConsumeChannel(c *Channel) *consume_channel_obj
func NewDbByNull ¶
func NewDbByNull() *db
func Recovery ¶
func Recovery(content *json.RawMessage, isStop bool)
func SaveDBConfigInfo ¶
func SaveDBConfigInfo()
func SaveDBInfoToFileData ¶
func SaveDBInfoToFileData() interface{}
func StopAllChannel ¶
func StopAllChannel()
func UpdateTable ¶
Types ¶
type Channel ¶
type Channel struct { sync.RWMutex Name string MaxThreadNum int // 消费通道的最大线程数 CurrentThreadNum int Status StatusFlag //stop ,starting,running,wait // contains filtered or unexported fields }
func GetChannel ¶
func NewChannel ¶
func (*Channel) GetChannel ¶
func (Channel *Channel) GetChannel() chan mysql.EventReslut
func (*Channel) GetChannelMaxThreadNum ¶
func (*Channel) GetCountChan ¶
func (*Channel) SetChannelMaxThreadNum ¶
func (*Channel) SetFlowCountChan ¶
func (*Channel) Start ¶
func (Channel *Channel) Start() chan mysql.EventReslut
type DbListStruct ¶
type DbListStruct struct { Name string ConnectUri string ConnStatus StatusFlag //close,stop,starting,running ConnErr string ChannelCount int LastChannelID int TableCount int BinlogDumpFileName string BinlogDumpPosition uint32 BinlogDumpTimestamp uint32 MaxBinlogDumpFileName string MaxBinlogDumpPosition uint32 ReplicateDoDb map[string]uint8 ServerId uint32 AddTime int64 }
func GetDbInfo ¶ added in v1.2.2
func GetDbInfo(dbname string) *DbListStruct
type StatusFlag ¶
type StatusFlag string
const ( DEFAULT StatusFlag = "" STARTING StatusFlag = "starting" RUNNING StatusFlag = "running" STOPPING StatusFlag = "stopping" STOPPED StatusFlag = "stopped" CLOSING StatusFlag = "closing" CLOSED StatusFlag = "closed" KILLING StatusFlag = "killing" KILLED StatusFlag = "killed" DELING StatusFlag = "deling" DELED StatusFlag = "deled" )
type TmpPositioinStruct ¶
type ToServer ¶
type ToServer struct { sync.RWMutex Key *string `json:"-"` // 上一级的key ToServerID int PluginName string MustBeSuccess bool FilterQuery bool FilterUpdate bool FieldList []string ToServerKey string BinlogFileNum int BinlogPosition uint32 PluginParam map[string]interface{} Status StatusFlag ToServerChan *ToServerChan `json:"-"` Error string ErrorWaitDeal int ErrorWaitData *pluginDriver.PluginDataType LastBinlogFileNum int // 由 channel 提交到 ToServerChan 的最后一个位点 LastBinlogPosition uint32 // 假如 BinlogFileNum == LastBinlogFileNum && BinlogPosition == LastBinlogPosition 则说明这个位点是没有问题的 LastBinlogKey []byte `json:"-"` // 将数据保存到 level 的key QueueMsgCount uint32 // 队列里的堆积的数量 FileQueueStatus bool // 是否启动文件队列 Notes string ThreadCount int16 // 消费线程数量 FileQueueUsableCount uint32 // 在开始文件队列的配置下,每次写入 ToServerChan 后 ,在 FileQueueUsableCountTimeDiff 时间内 队列都是满的次数 FileQueueUsableCountStartTime int64 // 开始统计 FileQueueUsableCount 计算的时间 // contains filtered or unexported fields }
func (*ToServer) AddWaitError ¶
func (This *ToServer) AddWaitError(WaitErr error, WaitData *pluginDriver.PluginDataType) bool
func (*ToServer) AppendToFileQueue ¶ added in v1.2.2
func (This *ToServer) AppendToFileQueue(data *pluginDriver.PluginDataType) error
将数据刷到磁盘队列中
func (*ToServer) ConsumeToServer ¶
func (*ToServer) DealWaitError ¶
func (*ToServer) DelWaitError ¶
func (*ToServer) FileQueueStart ¶ added in v1.2.2
文件队列启用
func (*ToServer) GetFileQueueInfo ¶ added in v1.2.2
查看文件队列基本信息
func (*ToServer) GetWaitErrorDeal ¶
func (*ToServer) InitFileQueue ¶ added in v1.2.2
初始化文件队列
func (*ToServer) PopFileQueue ¶ added in v1.2.2
func (This *ToServer) PopFileQueue() (*pluginDriver.PluginDataType, error)
从磁盘队列中取出最前面一条数据
func (*ToServer) ReadLastFromFileQueue ¶ added in v1.2.2
func (This *ToServer) ReadLastFromFileQueue() (*pluginDriver.PluginDataType, error)
从磁盘队列中取出最后面一条数据
func (*ToServer) SkipBinlog ¶
func (This *ToServer) SkipBinlog(MyConsumerId int, SkipErrData *pluginDriver.PluginDataType) (err error)
跳过位点
type ToServerChan ¶
type ToServerChan struct {
To chan *pluginDriver.PluginDataType
}
type ToServerStatus ¶
type ToServerStatus string