Documentation
¶
Index ¶
- Constants
- Variables
- type FollowerPacket
- type FollowerTransport
- type MakeExtentRepairReadPacket
- type MakeStreamReadResponsePacket
- type NewPacketFunc
- type Packet
- func NewNormalExtentWithHoleStreamReadResponsePacket(requestID int64, partitionID uint64, extentID uint64) (p *Packet)
- func NewPacket() (p *Packet)
- func NewPacketToGetAllWatermarks(partitionID uint64, extentType uint8) (p *Packet)
- func NewPacketToNotifyExtentRepair(partitionID uint64) (p *Packet)
- func NewPacketToReadTinyDeleteRecord(partitionID uint64, offset int64) (p *Packet)
- func NewReadTinyDeleteRecordResponsePacket(requestID int64, partitionID uint64) (p *Packet)
- func NewTinyExtentStreamReadResponsePacket(requestID int64, partitionID uint64, extentID uint64) (p *Packet)
- func (p *Packet) AfterTp() (ok bool)
- func (p *Packet) BeforeTp(clusterID string) (ok bool)
- func (p *Packet) GetArg() []byte
- func (p *Packet) GetArgLen() uint32
- func (p *Packet) GetCRC() uint32
- func (p *Packet) GetData() []byte
- func (p *Packet) GetExtentID() uint64
- func (p *Packet) GetExtentOffset() int64
- func (p *Packet) GetOpcode() uint8
- func (p *Packet) GetPartitionID() uint64
- func (p *Packet) GetResultCode() uint8
- func (p *Packet) GetSize() uint32
- func (p *Packet) GetStartT() int64
- func (p *Packet) IsBatchDeleteExtents() bool
- func (p *Packet) IsBroadcastMinAppliedID() bool
- func (p *Packet) IsCreateExtentOperation() bool
- func (p *Packet) IsErrPacket() bool
- func (p *Packet) IsForwardPacket() bool
- func (p *Packet) IsLeaderPacket() (ok bool)
- func (p *Packet) IsMarkDeleteExtentOperation() bool
- func (p *Packet) IsMarkSplitExtentOperation() bool
- func (p *Packet) IsMasterCommand() bool
- func (p *Packet) IsNormalWriteOperation() bool
- func (p *Packet) IsRandomWrite() bool
- func (p *Packet) IsReadOperation() bool
- func (p *Packet) IsSnapshotModWriteAppendOperation() bool
- func (p *Packet) IsSyncWrite() bool
- func (p *Packet) IsTinyExtentType() bool
- func (p *Packet) IsUrgentLeaderReq() bool
- func (p *Packet) PackErrorBody(action, msg string)
- func (p *Packet) ReadFull(c net.Conn, opcode uint8, readSize int) (err error)
- func (p *Packet) SetArg(data []byte)
- func (p *Packet) SetArglen(len uint32)
- func (p *Packet) SetCRC(crc uint32)
- func (p *Packet) SetData(data []byte)
- func (p *Packet) SetDegrade()
- func (p *Packet) SetExtentOffset(offset int64)
- func (p *Packet) SetOpCode(op uint8)
- func (p *Packet) SetResultCode(code uint8)
- func (p *Packet) SetSize(size uint32)
- func (p *Packet) SetStartT(StartT int64)
- func (p *Packet) ShallDegrade() bool
- func (p *Packet) UnsetDegrade()
- type PacketInterface
- func NewExtentRepairReadPacket(partitionID uint64, extentID uint64, offset, size int) (p PacketInterface)
- func NewNormalExtentWithHoleRepairReadPacket(partitionID uint64, extentID uint64, offset, size int) (p PacketInterface)
- func NewPacketEx() (p PacketInterface)
- func NewStreamReadResponsePacket(requestID int64, partitionID uint64, extentID uint64) (p PacketInterface)
- func NewTinyExtentRepairReadPacket(partitionID uint64, extentID uint64, offset, size int) (p PacketInterface)
- type ReplProtocol
- func (rp *ReplProtocol) OperatorAndForwardPktGoRoutine()
- func (rp *ReplProtocol) ReceiveResponseFromFollowersGoRoutine()
- func (rp *ReplProtocol) ServerConn()
- func (rp *ReplProtocol) SetSmux(f func(addr string) (net.Conn, error), putSmux func(conn net.Conn, force bool))
- func (rp *ReplProtocol) Stop()
- type SmuxConn
Constants ¶
const ( ActionSendToFollowers = "ActionSendToFollowers" ActionReceiveFromFollower = "ActionReceiveFromFollower" ActionWriteToClient = "ActionWriteToClient" ActionCheckReply = "ActionCheckReply" ActionPreparePkt = "ActionPreparePkt" )
const ( ReplRuning = 2 ReplExiting = 1 ReplHasExited = -3 FollowerTransportRuning = 2 FollowerTransportExiting = 1 FollowerTransportExited = -1 )
const (
ConnIsNullErr = "ConnIsNullErr"
)
const (
ReplProtocolError = 1
)
const (
RequestChanSize = 2048
)
Variables ¶
var ( ErrBadNodes = errors.New("BadNodesErr") ErrArgLenMismatch = errors.New("ArgLenMismatchErr") )
var ErrorUnknownOp = errors.New("unknown opcode")
Functions ¶
This section is empty.
Types ¶
type FollowerPacket ¶
func NewFollowerPacket ¶
func NewFollowerPacket() (fp *FollowerPacket)
func (*FollowerPacket) IsErrPacket ¶
func (p *FollowerPacket) IsErrPacket() bool
func (*FollowerPacket) PackErrorBody ¶
func (p *FollowerPacket) PackErrorBody(action, msg string)
type FollowerTransport ¶
type FollowerTransport struct {
// contains filtered or unexported fields
}
func NewFollowersTransport ¶
func NewFollowersTransport(addr string, c net.Conn) (ft *FollowerTransport, err error)
func (*FollowerTransport) Destory ¶
func (ft *FollowerTransport) Destory()
func (*FollowerTransport) Write ¶
func (ft *FollowerTransport) Write(p *FollowerPacket)
type MakeExtentRepairReadPacket ¶
type MakeExtentRepairReadPacket func(partitionID uint64, extentID uint64, offset, size int) (p PacketInterface)
type MakeStreamReadResponsePacket ¶
type MakeStreamReadResponsePacket func(requestID int64, partitionID uint64, extentID uint64) (p PacketInterface)
type NewPacketFunc ¶
type NewPacketFunc func() (p PacketInterface)
type Packet ¶
type Packet struct { proto.Packet IsReleased int32 // TODO what is released? Object interface{} TpObject *exporter.TimePointCount NeedReply bool OrgBuffer []byte AfterPre bool // contains filtered or unexported fields }
func (*Packet) GetExtentID ¶
func (*Packet) GetExtentOffset ¶
func (*Packet) GetPartitionID ¶
func (*Packet) GetResultCode ¶
func (*Packet) IsBatchDeleteExtents ¶
func (*Packet) IsBroadcastMinAppliedID ¶
func (*Packet) IsCreateExtentOperation ¶
func (*Packet) IsErrPacket ¶
func (*Packet) IsForwardPacket ¶
func (*Packet) IsLeaderPacket ¶
A leader packet is the packet send to the leader and does not require packet forwarding.
func (*Packet) IsMarkDeleteExtentOperation ¶
func (*Packet) IsMarkSplitExtentOperation ¶
func (*Packet) IsMasterCommand ¶
func (*Packet) IsNormalWriteOperation ¶
func (*Packet) IsRandomWrite ¶
func (*Packet) IsReadOperation ¶
func (*Packet) IsSnapshotModWriteAppendOperation ¶
func (*Packet) IsSyncWrite ¶
func (*Packet) IsTinyExtentType ¶
func (*Packet) IsUrgentLeaderReq ¶
op need to be processed by dp raft leader.
func (*Packet) PackErrorBody ¶
func (*Packet) SetDegrade ¶
func (p *Packet) SetDegrade()
func (*Packet) SetExtentOffset ¶
func (*Packet) SetResultCode ¶
func (*Packet) ShallDegrade ¶
func (*Packet) UnsetDegrade ¶
func (p *Packet) UnsetDegrade()
type PacketInterface ¶
type PacketInterface interface { IsErrPacket() bool WriteToConn(c net.Conn) (err error) ReadFromConnWithVer(c net.Conn, timeoutSec int) (err error) GetUniqueLogId() (m string) GetReqID() int64 GetPartitionID() uint64 GetExtentID() uint64 GetSize() uint32 GetCRC() uint32 GetArg() []byte GetArgLen() uint32 GetData() []byte GetResultCode() uint8 GetExtentOffset() int64 GetStartT() int64 SetSize(size uint32) GetOpcode() uint8 SetResultCode(uint8) SetCRC(crc uint32) SetExtentOffset(int64) GetOpMsg() (m string) ShallDegrade() bool SetStartT(StartT int64) SetData(data []byte) SetOpCode(uint8) LogMessage(action, remote string, start int64, err error) (m string) PackErrorBody(action, msg string) PacketOkReply() SetArglen(len uint32) SetArg(data []byte) }
func NewExtentRepairReadPacket ¶
func NewExtentRepairReadPacket(partitionID uint64, extentID uint64, offset, size int) (p PacketInterface)
func NewNormalExtentWithHoleRepairReadPacket ¶
func NewNormalExtentWithHoleRepairReadPacket(partitionID uint64, extentID uint64, offset, size int) (p PacketInterface)
func NewPacketEx ¶
func NewPacketEx() (p PacketInterface)
func NewStreamReadResponsePacket ¶
func NewStreamReadResponsePacket(requestID int64, partitionID uint64, extentID uint64) (p PacketInterface)
func NewTinyExtentRepairReadPacket ¶
func NewTinyExtentRepairReadPacket(partitionID uint64, extentID uint64, offset, size int) (p PacketInterface)
type ReplProtocol ¶
type ReplProtocol struct {
// contains filtered or unexported fields
}
ReplProtocol defines the struct of the replication protocol. 1. ServerConn reads a packet from the client socket, and analyzes the addresses of the followers. 2. After the preparation, the packet is send to toBeProcessedCh. If failure happens, send it to the response channel. 3. OperatorAndForwardPktGoRoutine fetches a packet from toBeProcessedCh, and determine if it needs to be forwarded to the followers. 4. receiveResponse fetches a reply from responseCh, executes postFunc, and writes a response to the client if necessary.
func NewReplProtocol ¶
func (*ReplProtocol) OperatorAndForwardPktGoRoutine ¶
func (rp *ReplProtocol) OperatorAndForwardPktGoRoutine()
OperatorAndForwardPktGoRoutine reads packets from the to-be-processed channel and writes responses to the client.
- Read a packet from toBeProcessCh, and determine if it needs to be forwarded or not. If the answer is no, then process the packet locally and put it into responseCh.
- If the packet needs to be forwarded, the first send it to the followers, and execute the operator function. Then notify receiveResponse to read the followers' responses.
- Read a reply from responseCh, and write to the client.
func (*ReplProtocol) ReceiveResponseFromFollowersGoRoutine ¶
func (rp *ReplProtocol) ReceiveResponseFromFollowersGoRoutine()
Receive response from all followers.
func (*ReplProtocol) ServerConn ¶
func (rp *ReplProtocol) ServerConn()
ServerConn keeps reading data from the socket to analyze the follower address, execute the prepare function, and throw the packets to the to-be-processed channel.