repl

package
v2.5.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2021 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ActionSendToFollowers     = "ActionSendToFollowers"
	ActionReceiveFromFollower = "ActionReceiveFromFollower"
	ActionWriteToClient       = "ActionWriteToClient"
	ActionCheckReply          = "ActionCheckReply"

	ActionPreparePkt = "ActionPreparePkt"
)
View Source
const (
	ReplRuning    = 2
	ReplExiting   = 1
	ReplHasExited = -3

	FollowerTransportRuning  = 2
	FollowerTransportExiting = 1
	FollowerTransportExited  = -1
)
View Source
const (
	ConnIsNullErr = "ConnIsNullErr"
)
View Source
const (
	ReplProtocolError = 1
)
View Source
const (
	RequestChanSize = 10240
)

Variables

View Source
var (
	ErrBadNodes       = errors.New("BadNodesErr")
	ErrArgLenMismatch = errors.New("ArgLenMismatchErr")
)
View Source
var (
	ErrorUnknownOp = errors.New("unknown opcode")
)

Functions

This section is empty.

Types

type FollowerPacket added in v1.4.0

type FollowerPacket struct {
	proto.Packet
	// contains filtered or unexported fields
}

func NewFollowerPacket added in v1.4.0

func NewFollowerPacket() (fp *FollowerPacket)

func (*FollowerPacket) IsErrPacket added in v1.4.0

func (p *FollowerPacket) IsErrPacket() bool

func (*FollowerPacket) PackErrorBody added in v1.4.0

func (p *FollowerPacket) PackErrorBody(action, msg string)

type FollowerTransport added in v1.4.0

type FollowerTransport struct {
	// contains filtered or unexported fields
}

func NewFollowersTransport added in v1.4.0

func NewFollowersTransport(addr string) (ft *FollowerTransport, err error)

func (*FollowerTransport) Destory added in v1.4.0

func (ft *FollowerTransport) Destory()

func (*FollowerTransport) Write added in v1.4.0

func (ft *FollowerTransport) Write(p *FollowerPacket)

type Packet

type Packet struct {
	proto.Packet

	IsReleased int32 // TODO what is released?
	Object     interface{}
	TpObject   *exporter.TimePointCount
	NeedReply  bool
	OrgBuffer  []byte
	// contains filtered or unexported fields
}

func NewExtentRepairReadPacket

func NewExtentRepairReadPacket(partitionID uint64, extentID uint64, offset, size int) (p *Packet)

func NewPacket

func NewPacket() (p *Packet)

func NewPacketToGetAllWatermarks

func NewPacketToGetAllWatermarks(partitionID uint64, extentType uint8) (p *Packet)

func NewPacketToNotifyExtentRepair

func NewPacketToNotifyExtentRepair(partitionID uint64) (p *Packet)

func NewPacketToReadTinyDeleteRecord added in v1.4.0

func NewPacketToReadTinyDeleteRecord(partitionID uint64, offset int64) (p *Packet)

func NewReadTinyDeleteRecordResponsePacket added in v1.4.0

func NewReadTinyDeleteRecordResponsePacket(requestID int64, partitionID uint64) (p *Packet)

func NewStreamReadResponsePacket

func NewStreamReadResponsePacket(requestID int64, partitionID uint64, extentID uint64) (p *Packet)

func NewTinyExtentRepairReadPacket added in v1.4.0

func NewTinyExtentRepairReadPacket(partitionID uint64, extentID uint64, offset, size int) (p *Packet)

func NewTinyExtentStreamReadResponsePacket added in v1.4.0

func NewTinyExtentStreamReadResponsePacket(requestID int64, partitionID uint64, extentID uint64) (p *Packet)

func (*Packet) AfterTp

func (p *Packet) AfterTp() (ok bool)

func (*Packet) BeforeTp

func (p *Packet) BeforeTp(clusterID string) (ok bool)

func (*Packet) IsBatchDeleteExtents

func (p *Packet) IsBatchDeleteExtents() bool

func (*Packet) IsBroadcastMinAppliedID added in v1.4.0

func (p *Packet) IsBroadcastMinAppliedID() bool

func (*Packet) IsCreateExtentOperation added in v1.4.0

func (p *Packet) IsCreateExtentOperation() bool

func (*Packet) IsErrPacket

func (p *Packet) IsErrPacket() bool

func (*Packet) IsForwardPacket added in v1.4.0

func (p *Packet) IsForwardPacket() bool

func (*Packet) IsLeaderPacket added in v1.4.0

func (p *Packet) IsLeaderPacket() (ok bool)

A leader packet is the packet send to the leader and does not require packet forwarding.

func (*Packet) IsMarkDeleteExtentOperation added in v1.4.0

func (p *Packet) IsMarkDeleteExtentOperation() bool

func (*Packet) IsMasterCommand

func (p *Packet) IsMasterCommand() bool

func (*Packet) IsRandomWrite added in v1.4.0

func (p *Packet) IsRandomWrite() bool

func (*Packet) IsReadOperation added in v1.4.0

func (p *Packet) IsReadOperation() bool

func (*Packet) IsSyncWrite added in v1.4.0

func (p *Packet) IsSyncWrite() bool

func (*Packet) IsTinyExtentType added in v1.4.0

func (p *Packet) IsTinyExtentType() bool

func (*Packet) IsWriteOperation added in v1.4.0

func (p *Packet) IsWriteOperation() bool

func (*Packet) PackErrorBody

func (p *Packet) PackErrorBody(action, msg string)

func (*Packet) ReadFromConnFromCli

func (p *Packet) ReadFromConnFromCli(c net.Conn, deadlineTime time.Duration) (err error)

func (*Packet) ReadFull

func (p *Packet) ReadFull(c net.Conn, opcode uint8, readSize int) (err error)

type ReplProtocol

type ReplProtocol struct {
	// contains filtered or unexported fields
}

ReplProtocol defines the struct of the replication protocol. 1. ServerConn reads a packet from the client socket, and analyzes the addresses of the followers. 2. After the preparation, the packet is send to toBeProcessedCh. If failure happens, send it to the response channel. 3. OperatorAndForwardPktGoRoutine fetches a packet from toBeProcessedCh, and determine if it needs to be forwarded to the followers. 4. receiveResponse fetches a reply from responseCh, executes postFunc, and writes a response to the client if necessary.

func NewReplProtocol

func NewReplProtocol(inConn net.Conn, prepareFunc func(p *Packet) error,
	operatorFunc func(p *Packet, c net.Conn) error, postFunc func(p *Packet) error) *ReplProtocol

func (*ReplProtocol) OperatorAndForwardPktGoRoutine

func (rp *ReplProtocol) OperatorAndForwardPktGoRoutine()

OperatorAndForwardPktGoRoutine reads packets from the to-be-processed channel and writes responses to the client.

  1. Read a packet from toBeProcessCh, and determine if it needs to be forwarded or not. If the answer is no, then process the packet locally and put it into responseCh.
  2. If the packet needs to be forwarded, the first send it to the followers, and execute the operator function. Then notify receiveResponse to read the followers' responses.
  3. Read a reply from responseCh, and write to the client.

func (*ReplProtocol) ReceiveResponseFromFollowersGoRoutine

func (rp *ReplProtocol) ReceiveResponseFromFollowersGoRoutine()

Receive response from all followers.

func (*ReplProtocol) ServerConn

func (rp *ReplProtocol) ServerConn()

ServerConn keeps reading data from the socket to analyze the follower address, execute the prepare function, and throw the packets to the to-be-processed channel.

func (*ReplProtocol) Stop

func (rp *ReplProtocol) Stop()

Stop stops the replication protocol.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL