server

package
v1.1.0-beta.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2019 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Copyright [2018] [jc3wish]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright [2018] [jc3wish]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright [2018] [jc3wish]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright [2018] [jc3wish]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright [2018] [jc3wish]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright [2018] [jc3wish]

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

View Source
var DbList map[string]*db
View Source
var DbLock sync.Mutex
View Source
var TmpPositioin []*TmpPositioinStruct

Functions

func AddNewDB

func AddNewDB(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, serverId uint32, maxFileName string, maxPosition uint32, AddTime int64) *db

func AddTable

func AddTable(db string, schema string, tableName string, channelId int) error

func AddTableToServer

func AddTableToServer(db string, schemaName string, tableName string, ToServerInfo ToServer) error

func CompareBinlogPositionAndReturnGreater

func CompareBinlogPositionAndReturnGreater(BinlogFileNum1 int, BinlogPosition1 uint32, BinlogFileNum2 int, BinlogPosition2 uint32) (int, uint32)

func CompareBinlogPositionAndReturnLess

func CompareBinlogPositionAndReturnLess(BinlogFileNum1 int, BinlogPosition1 uint32, BinlogFileNum2 int, BinlogPosition2 uint32) (int, uint32)

func DelChannel

func DelChannel(name string, channelID int) bool

func DelDB

func DelDB(Name string) bool

func DelTable

func DelTable(db string, schema string, tableName string) error

func GetDBObj

func GetDBObj(Name string) *db

func GetListDb

func GetListDb() map[string]DbListStruct

func GetSchemaAndTableBySplit

func GetSchemaAndTableBySplit(schemaAndTableName string) (schemaName, tableName string)

func GetSchemaAndTableJoin

func GetSchemaAndTableJoin(schema, tableName string) string

func InitStorage

func InitStorage()

func InitStrageChan

func InitStrageChan(ch chan int8)

func NewConsumeChannel

func NewConsumeChannel(c *Channel) *consume_channel_obj

func NewDb

func NewDb(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, serverId uint32, maxFileName string, maxPosition uint32, AddTime int64) *db

func Recovery

func Recovery(content *json.RawMessage)

func SaveDBConfigInfo

func SaveDBConfigInfo()

func SaveDBInfoToFileData

func SaveDBInfoToFileData() interface{}

func StopAllChannel

func StopAllChannel()

func UpdateDB

func UpdateDB(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, serverId uint32, maxFileName string, maxPosition uint32, UpdateTime int64, updateToServer int8) error

Types

type Channel

type Channel struct {
	sync.Mutex
	Name string

	MaxThreadNum     int // 消费通道的最大线程数
	CurrentThreadNum int
	Status           string //stop ,starting,running,wait
	// contains filtered or unexported fields
}

func GetChannel

func GetChannel(name string, channelID int) *Channel

func NewChannel

func NewChannel(MaxThreadNum int, Name string, db *db) *Channel

func (*Channel) Close

func (Channel *Channel) Close()

func (*Channel) GetChannel

func (Channel *Channel) GetChannel() chan mysql.EventReslut

func (*Channel) GetChannelMaxThreadNum

func (This *Channel) GetChannelMaxThreadNum() int

func (*Channel) GetCountChan

func (Channel *Channel) GetCountChan() chan *count.FlowCount

func (*Channel) SetChannelMaxThreadNum

func (This *Channel) SetChannelMaxThreadNum(n int)

func (*Channel) SetFlowCountChan

func (Channel *Channel) SetFlowCountChan(flowChan chan *count.FlowCount)

func (*Channel) Start

func (Channel *Channel) Start() chan mysql.EventReslut

func (*Channel) Stop

func (Channel *Channel) Stop()

type DbListStruct

type DbListStruct struct {
	Name                  string
	ConnectUri            string
	ConnStatus            string //close,stop,starting,running
	ConnErr               string
	ChannelCount          int
	LastChannelID         int
	TableCount            int
	BinlogDumpFileName    string
	BinlogDumpPosition    uint32
	BinlogDumpTimestamp   uint32
	MaxBinlogDumpFileName string
	MaxBinlogDumpPosition uint32
	ReplicateDoDb         map[string]uint8
	ServerId              uint32
	AddTime               int64
}

func GetDbInfo added in v1.2.2

func GetDbInfo(dbname string) *DbListStruct

type Table

type Table struct {
	sync.Mutex
	Name           string
	ChannelKey     int
	LastToServerID int
	ToServerList   []*ToServer
}

type TmpPositioinStruct

type TmpPositioinStruct struct {
	sync.RWMutex
	Data map[string]positionStruct
}

type ToServer

type ToServer struct {
	sync.RWMutex
	ToServerID         int
	PluginName         string
	MustBeSuccess      bool
	FilterQuery        bool
	FilterUpdate       bool
	FieldList          []string
	ToServerKey        string
	BinlogFileNum      int
	BinlogPosition     uint32
	PluginParam        map[string]interface{}
	Status             string
	ToServerChan       *ToServerChan `json:"-"`
	Error              string
	ErrorWaitDeal      int
	ErrorWaitData      interface{}
	PluginConn         driver.ConnFun `json:"-"`
	PluginConnKey      string         `json:"-"`
	PluginParamObj     interface{}    `json:"-"`
	LastBinlogFileNum  int            // 由 channel 提交到 ToServerChan 的最后一个位点
	LastBinlogPosition uint32         // 假如 BinlogFileNum == LastBinlogFileNum && BinlogPosition == LastBinlogPosition 则说明这个位点是没有问题的
	LastBinlogKey      []byte         `json:"-"` // 将数据保存到 level 的key
	QueueMsgCount      uint32         // 队列里的堆积的数量
}

func (*ToServer) AddWaitError

func (This *ToServer) AddWaitError(WaitErr error, WaitData interface{}) bool

func (*ToServer) ConsumeToServer

func (This *ToServer) ConsumeToServer(db *db, SchemaName string, TableName string)

func (*ToServer) DealWaitError

func (This *ToServer) DealWaitError() bool

func (*ToServer) DelWaitError

func (This *ToServer) DelWaitError() bool

func (*ToServer) GetWaitErrorDeal

func (This *ToServer) GetWaitErrorDeal() int

func (*ToServer) UpdateBinlogPosition

func (This *ToServer) UpdateBinlogPosition(BinlogFileNum int, BinlogPosition uint32) bool

type ToServerChan

type ToServerChan struct {
	To chan *pluginDriver.PluginDataType
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL