Documentation ¶
Index ¶
- Constants
- type DataFile
- type DirectWriter
- type FakeGenerator
- type FileHeader
- type FileReader
- type FileWriter
- type KafkaReader
- type KafkaWriter
- type ListenSocket
- type MockReader
- type MockWriter
- type Packet
- type RPCReader
- type RPCWriter
- type Reader
- type ReaderFactory
- type Replayer
- type TCPReader
- type TCPWriter
- type TMessage
- type TcpSocket
- type TunnelRPC
- type WMessage
- type WriteOtherInfo
- type Writer
- type WriterFactory
Constants ¶
View Source
const ( FILE_MAGIC_NUMBER uint64 = 0xeeeeeeeeee201314 FILE_PROTOCOL_NUMBER uint32 = 1 BLOCK_HEADER_SIZE = 20 )
View Source
const ( BatchSize = 64 TableName = "mongoshake_mock.table" )
View Source
const ( MagicNumber = 0xCAFE CurrentVersion = 0x01 HeaderLen = 12 )
Network packet structure
[ Big-edian ] Header (12 Bytes) Body (n Bytes) [ Header structure ] ----------------------------------------------------------------------------------- | magic(2B) | version(1B) | type(1B) | crc32(4B) | length(4B) | ----------------------------------------------------------------------------------- | 0x00201314 | 0x01 | 0x01 | 0xFFFFF | 4096 | ----------------------------------------------------------------------------------- [ PacketWrite payload ] ------------------------------------------------------------------------------------------------------------------------------------------------- | cksum(4B) | tag(4B) | shard(4B) | compress(4B) | number(4B) | len(4B) | log([]byte) | len(4B) | log([]byte) | ------------------------------------------------------------------------------------------------------------------------------------------------- [ PacketGetACK payload ] --------------| | (zero) | --------------| [ PacketReturnACK payload ] ------------------ | ack(4B) | ------------------
View Source
const ( PacketIncomplete uint8 = 0x00 PacketGetACK uint8 = 0x01 PacketWrite uint8 = 0x02 PacketReturnACK uint8 = 0x3 UndefinedPacketType uint8 = 0x4 )
View Source
const ( TransferChannel = iota RecvAckChannel TotalQueueNum )
View Source
const ( MsgNormal = 0x00000000 MsgRetransmission = 0x00000001 MsgProbe = 0x00000010 MsgResident = 0x00000100 MsgPersistent = 0x00001000 MsgStorageBackend = 0x00010000 )
View Source
const ( ReplyOK = 0 ReplyError int64 = -1 ReplyNetworkOpFail int64 = -2 ReplyNetworkTimeout int64 = -3 ReplyRetransmission int64 = -4 ReplyServerFault int64 = -5 ReplyChecksumInvalid int64 = -6 ReplyCompressorNotSupported int64 = -7 ReplyDecompressInvalid = -8 )
View Source
const InitialStageChecking = false
View Source
const NetworkDefaultTimeout = 60 * time.Second
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DataFile ¶
type DataFile struct {
// contains filtered or unexported fields
}
func (*DataFile) ReadHeader ¶
func (dataFile *DataFile) ReadHeader() *FileHeader
func (*DataFile) WriteHeader ¶
func (dataFile *DataFile) WriteHeader()
type DirectWriter ¶
type DirectWriter struct { RemoteAddrs []string ReplayerId uint32 // equal to worker-id // contains filtered or unexported fields }
func (*DirectWriter) AckRequired ¶
func (writer *DirectWriter) AckRequired() bool
func (*DirectWriter) ParsedLogsRequired ¶
func (writer *DirectWriter) ParsedLogsRequired() bool
func (*DirectWriter) Prepare ¶
func (writer *DirectWriter) Prepare() bool
func (*DirectWriter) Send ¶
func (writer *DirectWriter) Send(message *WMessage) int64
type FakeGenerator ¶
type FakeGenerator struct {
// contains filtered or unexported fields
}
type FileHeader ¶
*
- File Structure *
- |----- Header ------|------ OplogBlock ------|------ OplogBlock --------| ......
- |<--- 32bytes ---->| *
type FileReader ¶
type FileReader struct { File string // contains filtered or unexported fields }
func (*FileReader) Link ¶
func (tunnel *FileReader) Link(relativeReplayer []Replayer) error
type FileWriter ¶
type FileWriter struct { // local file folder path Local string // contains filtered or unexported fields }
func (*FileWriter) AckRequired ¶
func (tunnel *FileWriter) AckRequired() bool
func (*FileWriter) ParsedLogsRequired ¶
func (tunnel *FileWriter) ParsedLogsRequired() bool
func (*FileWriter) Prepare ¶
func (tunnel *FileWriter) Prepare() bool
func (*FileWriter) Send ¶
func (tunnel *FileWriter) Send(message *WMessage) int64
func (*FileWriter) SyncToDisk ¶
func (tunnel *FileWriter) SyncToDisk()
type KafkaReader ¶
type KafkaReader struct {
// contains filtered or unexported fields
}
func (*KafkaReader) Link ¶
func (tunnel *KafkaReader) Link(replayer []Replayer) error
type KafkaWriter ¶
type KafkaWriter struct { RemoteAddr string TunnelKafkaSecurity string KafkaClientCer string KafkaClientKey string KafkaServerCer string KafkaSaslUser string KafkaSaslPassword string // contains filtered or unexported fields }
func (*KafkaWriter) AckRequired ¶
func (tunnel *KafkaWriter) AckRequired() bool
func (*KafkaWriter) ParsedLogsRequired ¶
func (tunnel *KafkaWriter) ParsedLogsRequired() bool
func (*KafkaWriter) Prepare ¶
func (tunnel *KafkaWriter) Prepare() bool
func (*KafkaWriter) Send ¶
func (tunnel *KafkaWriter) Send(message *WMessage) int64
type ListenSocket ¶
type ListenSocket struct {
// contains filtered or unexported fields
}
type MockReader ¶
type MockReader struct {
// contains filtered or unexported fields
}
func (*MockReader) Link ¶
func (tunnel *MockReader) Link(replayer []Replayer) error
type MockWriter ¶
type MockWriter struct { }
func (*MockWriter) AckRequired ¶
func (tunnel *MockWriter) AckRequired() bool
func (*MockWriter) ParsedLogsRequired ¶
func (tunnel *MockWriter) ParsedLogsRequired() bool
func (*MockWriter) Prepare ¶
func (tunnel *MockWriter) Prepare() bool
func (*MockWriter) Send ¶
func (tunnel *MockWriter) Send(message *WMessage) int64
type RPCWriter ¶
type RPCWriter struct { RemoteAddr string // contains filtered or unexported fields }
func (*RPCWriter) AckRequired ¶
func (*RPCWriter) ParsedLogsRequired ¶
type ReaderFactory ¶
type ReaderFactory struct {
Name string
}
func (*ReaderFactory) Create ¶
func (factory *ReaderFactory) Create(address string) Reader
create specific Tunnel with tunnel name and pass connection or usefully meta
type TCPWriter ¶
type TCPWriter struct { RemoteAddr string // contains filtered or unexported fields }
func (*TCPWriter) AckRequired ¶
func (*TCPWriter) ParsedLogsRequired ¶
type TMessage ¶
func (*TMessage) ApproximateSize ¶
type WMessage ¶
type WMessage struct { *TMessage // whole raw log ParsedLogs []*oplog.PartialLog // parsed log }
WMessage wrapped TMessage
type WriteOtherInfo ¶
type WriteOtherInfo struct { TunnelKafkaSecurity string KafkaClientCer string KafkaClientKey string KafkaServerCer string KafkaSaslUser string KafkaSaslPassword string }
准备write的是要的其他字段信息
type Writer ¶
type Writer interface { /** * Indicate weather this tunnel cares about ACK feedback value. * Like RPC_TUNNEL (ack required is true), it's asynchronous and * needs peer receiver has completely consumed the log entries * and we can drop the reserved log entries only if the log entry * ACK is confirmed */ AckRequired() bool /** * prepare stage of the tunnel such as create the network connection or initialize * something etc before the Send() invocation. * return true on successful or false on failed */ Prepare() bool /** * write the real tunnel message to tunnel. * * return the right ACK offset value with positive number. if AckRequired is set * this ACk offset is used to purge buffered oplogs. Otherwise upper layer use * the max oplog ts as ACK offset and discard the returned value (ACK offset). * error on returning a negative number */ Send(message *WMessage) int64 /** * whether need parsed log or raw log */ ParsedLogsRequired() bool }
type WriterFactory ¶
type WriterFactory struct {
Name string
}
func (*WriterFactory) Create ¶
func (factory *WriterFactory) Create(address []string, writeOtherInfo *WriteOtherInfo, workerId uint32) Writer
create specific Tunnel with tunnel name and pass connection or usefully meta
Source Files ¶
Click to show internal directories.
Click to hide internal directories.