Documentation ¶
Index ¶
- Constants
- Variables
- func CreateFile(path string) error
- func ErrorCodeToString(e int32) string
- func ForwardCreateReq(serverName string, req *pb.CreateChunkReq, peer string) error
- func GetAddr(host string, port uint32) string
- func IsClose[T any](ch <-chan T) bool
- func LoadChunk(path string, used uint32, start uint32, end uint32) ([]byte, error)
- func NewAppendDataResp(errorCode int32) *pb.AppendDataResp
- func NewCreateChunkResp(errorCode int32) *pb.CreateChunkResp
- func NewDeleteChunkResp(errorCode int32) *pb.DeleteChunkResp
- func NewForwardCreateResp(errorCode int32) *pb.ForwardCreateResp
- func NewGetVersionResp(errorCode int32, version *uint32, fileData []byte) *pb.GetVersionResp
- func NewPeerConn(address string) (*grpc.ClientConn, error)
- func NewReadResp(fileData []byte, errorCode int32, version *uint32) *pb.ReadResp
- func NewReadVersionResp(errorCode int32, version *uint32) *pb.ReadVersionResp
- func NewReplicateReq(req *pb.ReplicateReq, peer string) error
- func NewReplicateResp(errorCode int32, uuid string) *pb.ReplicateResp
- func NewStatus(errorCode int32) *pb.Status
- func OverWriteChunk(chunkMeta *ChunkMetaData, content []byte) error
- func ReplicateRespToAppendResp(replicateResp *pb.ReplicateResp) *pb.AppendDataResp
- func Sum[T Number](slice []T) T
- func WriteFile(chunkMeta *ChunkMetaData, content []byte) error
- type ChunkMetaData
- type ChunkServer
- func (s *ChunkServer) AppendData(ctx context.Context, appendReq *pb.AppendDataReq) (*pb.AppendDataResp, error)
- func (s *ChunkServer) AssignNewPrimary(ctx context.Context, req *pb.AssignNewPrimaryReq) (res *pb.AssignNewPrimaryResp, err error)
- func (s *ChunkServer) ChangeToPrimary(ctx context.Context, req *pb.ChangeToPrimaryReq) (res *pb.ChangeToPrimaryResp, err error)
- func (s *ChunkServer) CreateChunk(ctx context.Context, createChunkReq *pb.CreateChunkReq) (*pb.CreateChunkResp, error)
- func (s *ChunkServer) DeleteChunk(ctx context.Context, deleteReq *pb.DeleteChunkReq) (*pb.DeleteChunkResp, error)
- func (s *ChunkServer) ForwardCreate(ctx context.Context, forwardCreateReq *pb.ForwardCreateReq) (*pb.ForwardCreateResp, error)
- func (s *ChunkServer) GetVersion(ctx context.Context, req *pb.GetVersionReq) (res *pb.GetVersionResp, err error)
- func (s *ChunkServer) Read(ctx context.Context, readReq *pb.ReadReq) (*pb.ReadResp, error)
- func (s *ChunkServer) ReadVersion(ctx context.Context, readVersion *pb.ReadVersionReq) (*pb.ReadVersionResp, error)
- func (s *ChunkServer) Replicate(ctx context.Context, replicateReq *pb.ReplicateReq) (*pb.ReplicateResp, error)
- func (s *ChunkServer) SendGetVersion(chunkHandle string)
- func (s *ChunkServer) SendHeartBeat()
- func (s *ChunkServer) SendRegister() error
- func (s *ChunkServer) UpdateBackup(ctx context.Context, req *pb.UpdateBackupReq) (*pb.UpdateBackupResp, error)
- type DebugInfo
- type GetVersionTimer
- type HeartBeatTimer
- type Number
- type RespMetaData
Constants ¶
const ( Primary = iota Secondary )
const ( OK int32 = iota ERROR_NOT_PRIMARY ERROR_NOT_SECONDARY ERROR_READ_FAILED ERROR_CHUNK_ALREADY_EXISTS ERROR_CREATE_CHUNK_FAILED ERROR_APPEND_FAILED ERROR_REPLICATE_FAILED // ERROR_APPEND_NOT_EXISTS represents the chunk to be appended does not exist on local filesystem ERROR_APPEND_NOT_EXISTS ERROR_REPLICATE_NOT_EXISTS ERROR_SHOULD_NOT_HAPPEN ERROR_CHUNK_NOT_EXISTS ERROR_VERSIONS_DO_NOT_MATCH )
Variables ¶
var ClientOpts []grpc.DialOption = []grpc.DialOption{ grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(64*1024*1024+300), grpc.MaxCallSendMsgSize(64*1024*1024+300), ), grpc.WithInsecure(), }
Functions ¶
func CreateFile ¶
CreateFile creates an empty file on disk and creates all intermediate directories in path
func ErrorCodeToString ¶
func ForwardCreateReq ¶
func ForwardCreateReq(serverName string, req *pb.CreateChunkReq, peer string) error
ForwardCreateReq establishes a grpc.ClientConn with peer and forwards the pb.CreateChunkReq
func LoadChunk ¶
LoadChunk reads a file at the specified path with an offset start and ends the read at end if end equals to 0, LoadChunk reads and returns the whole data starting from start, otherwise it reads and returns (end - start) bytes
func NewAppendDataResp ¶
func NewAppendDataResp(errorCode int32) *pb.AppendDataResp
func NewCreateChunkResp ¶
func NewCreateChunkResp(errorCode int32) *pb.CreateChunkResp
func NewDeleteChunkResp ¶
func NewDeleteChunkResp(errorCode int32) *pb.DeleteChunkResp
func NewForwardCreateResp ¶
func NewForwardCreateResp(errorCode int32) *pb.ForwardCreateResp
func NewGetVersionResp ¶
func NewGetVersionResp(errorCode int32, version *uint32, fileData []byte) *pb.GetVersionResp
func NewPeerConn ¶
func NewPeerConn(address string) (*grpc.ClientConn, error)
NewPeerConn establishes and returns a grpc.ClientConn to the specified address
func NewReadResp ¶
NewReadResp returns a pointer to pb.ReadResp that represents the result of a read with pb.Status
func NewReadVersionResp ¶
func NewReadVersionResp(errorCode int32, version *uint32) *pb.ReadVersionResp
func NewReplicateReq ¶
func NewReplicateReq(req *pb.ReplicateReq, peer string) error
func NewReplicateResp ¶
func NewReplicateResp(errorCode int32, uuid string) *pb.ReplicateResp
func OverWriteChunk ¶
func OverWriteChunk(chunkMeta *ChunkMetaData, content []byte) error
OverWriteChunk overwrites existing file on disk with content
func ReplicateRespToAppendResp ¶
func ReplicateRespToAppendResp(replicateResp *pb.ReplicateResp) *pb.AppendDataResp
func WriteFile ¶
func WriteFile(chunkMeta *ChunkMetaData, content []byte) error
WriteFile opens chunk file in append mode and appends content to the file
Types ¶
type ChunkMetaData ¶
type ChunkMetaData struct { // file location in local file system ChunkLocation string // role of current chunkserver for this chunk Role uint32 // IP address of primary chunk server for this chunk PrimaryChunkServer string PeerAddress []string // Already used size in bytes Used uint32 // Add version number Version uint32 MetaDataLock sync.Mutex GetVersionChannel chan string }
type ChunkServer ¶
type ChunkServer struct { pb.UnimplementedChunkServerServer // a mapping from ChunkHandle(string) to ChunkMetaData Chunks map[string]*ChunkMetaData // a mapping from client token to client last sequence number ClientLastResp map[string]RespMetaData // globally unique server name ServerName string // base directory to store chunk files BasePath string HostName string Port uint32 MasterIP string MasterPort uint32 Debug bool DebugChan chan DebugInfo }
func (*ChunkServer) AppendData ¶
func (s *ChunkServer) AppendData(ctx context.Context, appendReq *pb.AppendDataReq) (*pb.AppendDataResp, error)
func (*ChunkServer) AssignNewPrimary ¶
func (s *ChunkServer) AssignNewPrimary(ctx context.Context, req *pb.AssignNewPrimaryReq) (res *pb.AssignNewPrimaryResp, err error)
AssignNewPrimary role to certain chunkhandle change the primary.
func (*ChunkServer) ChangeToPrimary ¶
func (s *ChunkServer) ChangeToPrimary(ctx context.Context, req *pb.ChangeToPrimaryReq) (res *pb.ChangeToPrimaryResp, err error)
ChangeToPrimary will receive by backup chunk server, to notice it be the new Primary with certain chunk handle.
func (*ChunkServer) CreateChunk ¶
func (s *ChunkServer) CreateChunk(ctx context.Context, createChunkReq *pb.CreateChunkReq) (*pb.CreateChunkResp, error)
CreateChunk creates file on local filesystem that represents a chunk per Master Server's request
func (*ChunkServer) DeleteChunk ¶
func (s *ChunkServer) DeleteChunk(ctx context.Context, deleteReq *pb.DeleteChunkReq) (*pb.DeleteChunkResp, error)
DeleteChunk deletes the chunk metadata that corresponds with a string chunk handle on the primary chunk server as well as on backup servers
func (*ChunkServer) ForwardCreate ¶
func (s *ChunkServer) ForwardCreate(ctx context.Context, forwardCreateReq *pb.ForwardCreateReq) (*pb.ForwardCreateResp, error)
ForwardCreate create new chunk as backup
func (*ChunkServer) GetVersion ¶
func (s *ChunkServer) GetVersion(ctx context.Context, req *pb.GetVersionReq) (res *pb.GetVersionResp, err error)
func (*ChunkServer) ReadVersion ¶
func (s *ChunkServer) ReadVersion(ctx context.Context, readVersion *pb.ReadVersionReq) (*pb.ReadVersionResp, error)
ReadVersion returns the version of a chunk that corresponds with a string chunk handle and returns error if the chunk is not recorded by the ChunkServer
func (*ChunkServer) Replicate ¶
func (s *ChunkServer) Replicate(ctx context.Context, replicateReq *pb.ReplicateReq) (*pb.ReplicateResp, error)
func (*ChunkServer) SendGetVersion ¶
func (s *ChunkServer) SendGetVersion(chunkHandle string)
func (*ChunkServer) SendHeartBeat ¶
func (s *ChunkServer) SendHeartBeat()
func (*ChunkServer) SendRegister ¶
func (s *ChunkServer) SendRegister() error
func (*ChunkServer) UpdateBackup ¶
func (s *ChunkServer) UpdateBackup(ctx context.Context, req *pb.UpdateBackupReq) (*pb.UpdateBackupResp, error)
type GetVersionTimer ¶
type GetVersionTimer struct { Srv *ChunkServer ChunkHandle string Timeout int Quit <-chan string }
func (*GetVersionTimer) Trigger ¶
func (t *GetVersionTimer) Trigger()
type HeartBeatTimer ¶
type HeartBeatTimer struct { Srv *ChunkServer // Timeout in millisecond Timeout int }
func (*HeartBeatTimer) Trigger ¶
func (t *HeartBeatTimer) Trigger()
type Number ¶
type Number interface { constraints.Integer | constraints.Float }
type RespMetaData ¶
type RespMetaData struct { // client last seq LastID string // last response to client append request AppendResp *pb.AppendDataResp // error Err error }