client

package
v2.1.8+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2019 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultEtcdTimeout is the default timeout config for etcd.
	DefaultEtcdTimeout = 5 * time.Second

	// DefaultRetryTime is the default retry time for each pump.
	DefaultRetryTime = 10

	// DefaultBinlogWriteTimeout is the default max time binlog can use to write to pump.
	DefaultBinlogWriteTimeout = 15 * time.Second

	// CheckInterval is the default interval for check unavaliable pumps.
	CheckInterval = 30 * time.Second
)
View Source
const (
	// Range means range strategy.
	Range = "range"

	// Hash means hash strategy.
	Hash = "hash"

	// Score means choose pump by it's score.
	Score = "score"

	// LocalUnix means will only use the local pump by unix socket.
	LocalUnix = "local unix"
)

Variables

View Source
var (
	// Logger is ..., obsolete now.
	Logger = log.New()

	// ErrNoAvaliablePump means no avaliable pump to write binlog.
	ErrNoAvaliablePump = errors.New("no avaliable pump to write binlog")

	// CommitBinlogTimeout is the max retry duration time for write commit/rollback binlog.
	CommitBinlogTimeout = 10 * time.Minute

	// RetryInterval is the interval of retrying to write binlog.
	RetryInterval = 100 * time.Millisecond
)

Functions

func InitLogger

func InitLogger(cfg *logutil.LogConfig) error

InitLogger initializes logger.

Types

type HashSelector

type HashSelector struct {
	// PumpMap saves the map of pump's node id with pump.
	PumpMap map[string]*PumpStatus

	// the pumps to be selected.
	Pumps []*PumpStatus
}

HashSelector select a pump by hash.

func (*HashSelector) Feedback

func (h *HashSelector) Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus)

Feedback implement PumpSelector.Feedback

func (*HashSelector) Select

func (h *HashSelector) Select(binlog *pb.Binlog, retryTime int) *PumpStatus

Select implement PumpSelector.Select.

func (*HashSelector) SetPumps

func (h *HashSelector) SetPumps(pumps []*PumpStatus)

SetPumps implement PumpSelector.SetPumps.

type LocalUnixSelector

type LocalUnixSelector struct {
	// the pump to be selected.
	Pump *PumpStatus
}

LocalUnixSelector will always select the local pump, used for compatible with kafka version tidb-binlog.

func (*LocalUnixSelector) Feedback

func (u *LocalUnixSelector) Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus)

Feedback implement PumpSelector.Feedback

func (*LocalUnixSelector) Select

func (u *LocalUnixSelector) Select(binlog *pb.Binlog, retryTime int) *PumpStatus

Select implement PumpSelector.Select.

func (*LocalUnixSelector) SetPumps

func (u *LocalUnixSelector) SetPumps(pumps []*PumpStatus)

SetPumps implement PumpSelector.SetPumps.

type PumpInfos

type PumpInfos struct {
	// Pumps saves the map of pump's nodeID and pump status.
	Pumps map[string]*PumpStatus

	// AvliablePumps saves the whole avaliable pumps' status.
	AvaliablePumps map[string]*PumpStatus

	// UnAvaliablePumps saves the unAvaliable pumps.
	// And only pump with Online state in this map need check is it avaliable.
	UnAvaliablePumps map[string]*PumpStatus
}

PumpInfos saves pumps' infomations in pumps client.

func NewPumpInfos

func NewPumpInfos() *PumpInfos

NewPumpInfos returns a PumpInfos.

type PumpSelector

type PumpSelector interface {
	// SetPumps set pumps to be selected.
	SetPumps([]*PumpStatus)

	// Select returns a situable pump. Tips: should call this function only one time for commit/rollback binlog.
	Select(binlog *pb.Binlog, retryTime int) *PumpStatus

	// Feedback set the corresponding relations between startTS and pump.
	Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus)
}

PumpSelector selects pump for sending binlog.

func NewHashSelector

func NewHashSelector() PumpSelector

NewHashSelector returns a new HashSelector.

func NewLocalUnixSelector

func NewLocalUnixSelector() PumpSelector

NewLocalUnixSelector returns a new LocalUnixSelector.

func NewRangeSelector

func NewRangeSelector() PumpSelector

NewRangeSelector returns a new ScoreSelector.

func NewScoreSelector

func NewScoreSelector() PumpSelector

NewScoreSelector returns a new ScoreSelector.

func NewSelector

func NewSelector(algorithm string) PumpSelector

NewSelector returns a PumpSelector according to the algorithm.

type PumpStatus

type PumpStatus struct {
	/*
		Pump has these state:
		Online:
			only when pump's state is online that pumps client can write binlog to.
		Pausing:
			this pump is pausing, and can't provide write binlog service. And this state will turn into Paused when pump is quit.
		Paused:
			this pump is paused, and can't provide write binlog service.
		Closing:
			this pump is closing, and can't provide write binlog service. And this state will turn into Offline when pump is quit.
		Offline:
			this pump is offline, and can't provide write binlog service forever.
	*/
	sync.RWMutex

	node.Status

	// the pump is avaliable or not, obsolete now
	IsAvaliable bool

	// the client of this pump
	Client pb.PumpClient

	ErrNum int64
	// contains filtered or unexported fields
}

PumpStatus saves pump's status.

func NewPumpStatus

func NewPumpStatus(status *node.Status, security *tls.Config) *PumpStatus

NewPumpStatus returns a new PumpStatus according to node's status.

func (*PumpStatus) IsUsable

func (p *PumpStatus) IsUsable() bool

IsUsable returns true if pump is usable.

func (*PumpStatus) Reset

func (p *PumpStatus) Reset()

Reset resets the pump's grpc conn and err num.

func (*PumpStatus) ResetGrpcClient

func (p *PumpStatus) ResetGrpcClient()

ResetGrpcClient closes the pump's grpc connection.

func (*PumpStatus) WriteBinlog

func (p *PumpStatus) WriteBinlog(req *pb.WriteBinlogReq, timeout time.Duration) (*pb.WriteBinlogResp, error)

WriteBinlog write binlog by grpc client.

type PumpsClient

type PumpsClient struct {
	sync.RWMutex

	// ClusterID is the cluster ID of this tidb cluster.
	ClusterID uint64

	// the registry of etcd.
	EtcdRegistry *node.EtcdRegistry

	// Pumps saves the pumps' information.
	Pumps *PumpInfos

	// Selector will select a suitable pump.
	Selector PumpSelector

	// the max retry time if write binlog failed, obsolete now.
	RetryTime int

	// BinlogWriteTimeout is the max time binlog can use to write to pump.
	BinlogWriteTimeout time.Duration

	// Security is the security config
	Security *tls.Config
	// contains filtered or unexported fields
}

PumpsClient is the client of pumps.

func NewLocalPumpsClient

func NewLocalPumpsClient(etcdURLs, binlogSocket string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error)

NewLocalPumpsClient returns a PumpsClient, this PumpsClient will write binlog by socket file. For compatible with kafka version pump.

func NewPumpsClient

func NewPumpsClient(etcdURLs, strategy string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error)

NewPumpsClient returns a PumpsClient. TODO: get strategy from etcd, and can update strategy in real-time. Use Range as default now.

func (*PumpsClient) Close

func (c *PumpsClient) Close()

Close closes the PumpsClient.

func (*PumpsClient) SetSelectStrategy

func (c *PumpsClient) SetSelectStrategy(strategy string) error

SetSelectStrategy sets the selector's strategy, strategy should be 'range' or 'hash' now.

func (*PumpsClient) WriteBinlog

func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error

WriteBinlog writes binlog to a situable pump. Tips: will never return error for commit/rollback binlog.

type RangeSelector

type RangeSelector struct {
	// Offset saves the offset in Pumps.
	Offset int

	// PumpMap saves the map of pump's node id with pump.
	PumpMap map[string]*PumpStatus

	// the pumps to be selected.
	Pumps []*PumpStatus
}

RangeSelector select a pump by range.

func (*RangeSelector) Feedback

func (r *RangeSelector) Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus)

Feedback implement PumpSelector.Select

func (*RangeSelector) Select

func (r *RangeSelector) Select(binlog *pb.Binlog, retryTime int) *PumpStatus

Select implement PumpSelector.Select.

func (*RangeSelector) SetPumps

func (r *RangeSelector) SetPumps(pumps []*PumpStatus)

SetPumps implement PumpSelector.SetPumps.

type ScoreSelector

type ScoreSelector struct{}

ScoreSelector select a pump by pump's score.

func (*ScoreSelector) Feedback

func (s *ScoreSelector) Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus)

Feedback implement PumpSelector.Feedback

func (*ScoreSelector) Select

func (s *ScoreSelector) Select(binlog *pb.Binlog, retryTime int) *PumpStatus

Select implement PumpSelector.Select.

func (*ScoreSelector) SetPumps

func (s *ScoreSelector) SetPumps(pumps []*PumpStatus)

SetPumps implement PumpSelector.SetPumps.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL