Documentation ¶
Index ¶
- Constants
- Variables
- func CallGetTimeStamp(address, sdfsFileName string, c chan Pair)
- func CallMapleJuiceRequest(workerID int, workerAddress string, files []string, args *MapleJuiceTaskArgs, ...)
- func CallNodesMergeTmpFiles(receiverAddress []string)
- func CallSingleNodeMergeTmpFiles(address string, ts int, c chan int)
- func CheckFile(sdfsfilename, address string) string
- func DeleteFile(address, sdfsName string, c chan string) error
- func DeleteSDFSDir(address, dir string) error
- func GetFile(address, sdfsfilename string, data *[]byte) error
- func GetMillisecond() int
- func IsInCircleRange(id, start, end int) bool
- func ListFileInSDFSDir(address, dir string) []string
- func ListFilesWithPrefixInNode(address, prefix string) []string
- func NormalCharToSpecial(s string) string
- func PutFile(address string, args *StoreFileArgs, c chan int)
- func ReadContent(conn net.Conn) []byte
- func ReplyTaskResultToDcli(message, clientAddress string)
- func SpecialCharToNormal(s string) string
- func WriteJuicePairToLocal(outputfile string, kvpair map[string]string)
- type ActionType
- type FileInfo
- type FileList
- func (fl *FileList) AppendFile(sdfsName string, root_dir string, timestamp int, masterNodeID int, data []byte) error
- func (fl *FileList) DeleteFileAndInfo(sdfsName string) bool
- func (fl *FileList) DeleteFileInfo(sdfsfilename string) bool
- func (fl *FileList) DeleteFileInfosOutOfRange(start, end int) []string
- func (fl *FileList) DeleteSDFSDir(dirName string)
- func (fl *FileList) DeleteTmpFilesFromFailedWorker(workerId int)
- func (fl *FileList) GetFileInfo(sdfsfilename string) *FileInfo
- func (fl *FileList) GetFilesInRange(startID, endID int) []string
- func (fl *FileList) GetOwnedFileInfos(masterId int) []FileInfo
- func (fl *FileList) GetTimeStamp(sdfsfilename string) int
- func (fl *FileList) ListFileInDir(dir string) []string
- func (fl *FileList) ListFilesWithPrefix(prefix string) []string
- func (fl *FileList) MergeDirectoryWithSurfix(surffix string)
- func (fl *FileList) MergeTmpFiles(tmpDir, desDir string, ts int)
- func (fl *FileList) PutFileInfo(sdfsName string, path string, timestamp int, masterNodeID int)
- func (fl *FileList) PutFileInfoBase(hashId int, sdfsfilename string, abs_path string, timestamp int, ...)
- func (fl *FileList) PutFileInfoObject(sdfsfilename string, fi *FileInfo)
- func (fl *FileList) ServeFile(sdfsfilename string) ([]byte, error)
- func (fl *FileList) StoreFile(sdfsName string, root_dir string, timestamp int, masterNodeID int, data []byte) error
- func (fl *FileList) StoreFileBase(hashId int, sdfsName string, root_dir string, timestamp int, masterNodeID int, ...) error
- func (fl *FileList) StoreTmpFile(sdfsName string, root_dir string, timestamp int, masterNodeID int, data []byte) error
- func (fl *FileList) UpdateMasterID(new_master_id int, needUpdate func(fileInfo *FileInfo) bool)
- type FileService
- func (fileService *FileService) CheckFileExists(sdfsfilename string, hostname *string) error
- func (fileService *FileService) DeleteFileRequest(sdfsName string, result *RPCResultType) error
- func (fileService *FileService) DeleteLocalFile(sdfsName string, result *RPCResultType) error
- func (fileService *FileService) DeleteSDFSDir(dir string, result *RPCResultType) error
- func (fileService *FileService) DeleteSDFSDirRequest(sdfsdir string, result *RPCResultType) error
- func (fileService *FileService) GetFileRequest(args []string, result *RPCResultType) error
- func (fileService *FileService) GetTimeStamp(sdfsFileName string, timestamp *int) error
- func (fileService *FileService) ListFileInDirRequest(sdfsDir string, res *[]string) error
- func (fileService *FileService) ListFileInLocalDir(dir string, result *[]string) error
- func (fileService *FileService) ListFilesWithPrefix(prefix string, result *[]string) error
- func (fileService *FileService) Ls(sdfsfilename string, hostnames *[]string) error
- func (fileService *FileService) PutFileRequest(args *PutFileArgs, result *RPCResultType) error
- func (fileService *FileService) ServeLocalFile(sdfsfilename string, result *[]byte) error
- func (fileService *FileService) StoreFileToLocal(args *StoreFileArgs, result *RPCResultType) error
- type MapleJuiceService
- func (mj *MapleJuiceService) AddMapleJuiceTask(args *MapleJuiceTaskArgs, result *RPCResultType) error
- func (mj *MapleJuiceService) ForwardMapleJuiceRequest(args *MapleJuiceTaskArgs, result *RPCResultType) error
- func (mj *MapleJuiceService) MergeTmpFiles(ts int, result *RPCResultType) error
- func (mj *MapleJuiceService) StartMapleJuiceTask(des *TaskDescription, result *RPCResultType) error
- type MapleJuiceTaskArgs
- type MapleJuiceTaskType
- type MemberList
- func (mbList *MemberList) DeleteNode(id int)
- func (mbList *MemberList) DumpToTmpFile()
- func (mbList *MemberList) FindLeastFreeId() int
- func (mbList *MemberList) GetAddress(id int) string
- func (mbList *MemberList) GetAllAddressesExcludeSelf() []string
- func (mbList *MemberList) GetAllRPCAddresses() []string
- func (mbList *MemberList) GetIP(id int) string
- func (mbList *MemberList) GetNextKNodes(id, k int) []MemberNode
- func (mbList *MemberList) GetNode(id int) *MemberNode
- func (mbList *MemberList) GetPrevKNodes(id, k int) []MemberNode
- func (mbList *MemberList) GetRPCAddress(id int) string
- func (mbList *MemberList) GetRPCAddressesForNextKNodes(start, k int) []string
- func (mbList *MemberList) GetSmallestNode() *MemberNode
- func (mbList *MemberList) GetTimeOutNodes(deadline, id, k int) []MemberNode
- func (mbList *MemberList) InsertNode(id int, ip, port, rpc_port string, heartbeat_t int, hostname string)
- func (mbList *MemberList) NicePrint()
- func (mbList *MemberList) NodeTimeOut(deadline, id int) bool
- func (mbList *MemberList) ToJson() []byte
- func (mbList *MemberList) UpdateNodeHeartbeat(id, heartbeat_t int)
- type MemberNode
- type Node
- func (node *Node) Broadcast(packet *Packet)
- func (node *Node) DeleteRedundantFile()
- func (node *Node) DeleteSDFSDirRequest(sdfsdir string) error
- func (node *Node) DuplicateReplica()
- func (node *Node) GetAddressOfLatestTS(sdfsfilename string) (string, int)
- func (node *Node) GetAddressesWithIds(ids []int) []string
- func (node *Node) GetFileRequest(args []string, result *RPCResultType) error
- func (node *Node) GetFilesFromSDFS(sdfsfiles []string, dir string) error
- func (node *Node) GetFirstKReplicaNodeID(sdfsfilename string, K int) []int
- func (node *Node) GetMasterID(sdfsfilename string) int
- func (node *Node) GetResponsibleAddresses(sdfsfilename string) []string
- func (node *Node) GetResponsibleHostname(sdfsName string) []string
- func (node *Node) HandleJuiceTask(input_dir, output_file string, f plugin.Symbol)
- func (node *Node) HandleMapleTask(input_dir, output_dir, prefix string, f plugin.Symbol)
- func (node *Node) HandleTCPFileRequest(conn net.Conn)
- func (node *Node) IndividualPutFileRequest(sdfsName, localName string, forceUpdate, appending, tmp bool, ...) error
- func (node *Node) InitMemberList()
- func (node *Node) IsAlive() bool
- func (node *Node) Join(address string) bool
- func (node *Node) JoinNode(packet Packet)
- func (node *Node) Leave()
- func (node *Node) ListFileInDirRequest(sdfsDir string) []string
- func (node *Node) ListFilesWithPrefixRequest(prefix string) []string
- func (node *Node) LostNode(id int, lose_heartbeat bool)
- func (node *Node) MonitorInputPacket()
- func (node *Node) PartitionFiles(files []string, numWorkers int, partitionMethod string) map[int][]string
- func (node *Node) PutFileRequest(args *PutFileArgs, result *RPCResultType) error
- func (node *Node) RegisterFileService(address string) error
- func (node *Node) RegisterMapleJuiceService(address string, mjService *MapleJuiceService) error
- func (node *Node) RegisterRPCMapleJuiceService()
- func (node *Node) ScanIntroducer(addresses []string) (string, bool)
- func (node *Node) SendFileIfNecessary(info FileInfo, targetRPCAddr []string)
- func (node *Node) SendHeartbeat()
- func (node *Node) SendHeartbeatRoutine()
- func (node *Node) SetFileDir(dir string)
- func (node *Node) StartMapleJuiceTask(des *TaskDescription) error
- func (node *Node) StartRPCService()
- func (node *Node) StartTCPService()
- func (node *Node) StoreFileToLocal(args *StoreFileArgs) error
- func (node *Node) TransferOwnership(newMasterId int)
- func (node *Node) UpdateHostname(name string)
- func (node *Node) WriteMaplePairToLocal(dir, prefix string, kvpair map[string]string)
- type Packet
- type Pair
- type PutFileArgs
- type RPCResultType
- type StatusType
- type StoreFileArgs
- type TaskDescription
Constants ¶
View Source
const ( ACTION_JOIN ActionType = 1 << 0 ACTION_REPLY_JOIN ActionType = 1 << 1 ACTION_NEW_NODE ActionType = 1 << 2 ACTION_DELETE_NODE ActionType = 1 << 3 ACTION_HEARTBEAT ActionType = 1 << 4 ACTION_PING ActionType = 1 << 5 ACTION_ACK ActionType = 1 << 6 STATUS_OK StatusType = 1 << 0 STATUS_FAIL StatusType = 1 << 1 STATUS_END StatusType = 1 << 2 LOSS_RATE = 0.00 NUM_MONITORS int = 3 HEARTBEAT_INTERVAL = 1500 * time.Millisecond TIMEOUT_THRESHOLD = 4 * time.Second )
View Source
const ( GETRequest string = "GET" PUTRequest string = "PUT" )
View Source
const DEBUG = false
View Source
const DUPLICATE_CNT = 4
View Source
const DcliReceiverPort = "8013"
View Source
const FILE_LIST_FILE = "/tmp/file.list"
View Source
const FileServiceName = "SimpleFileService"
View Source
const JuicePartitionMethod = "range"
View Source
const MAX_CAPACITY = 1024
View Source
const MEMBER_LIST_FILE = "/tmp/member.list"
View Source
const MIN_UPDATE_INTERVAL = 60 * 1000
View Source
const MapleJuiceServiceName = "MapleJuiceService"
MapleJuiceServiceName ...
View Source
const READ_QUORUM = 2
View Source
const RPC_DEFAULT_PORT = "8011"
View Source
const SPLIT = "___"
View Source
const TCPBufferSize = 1024
View Source
const TCP_FILE_PORT = "8012"
View Source
const WRITE_QUORUM = 3
Variables ¶
View Source
var HEARTBEAT_LOG_FLAG = false // debug
Functions ¶
func CallGetTimeStamp ¶
func CallMapleJuiceRequest ¶
func CallMapleJuiceRequest(workerID int, workerAddress string, files []string, args *MapleJuiceTaskArgs, waitChan chan int)
func CallNodesMergeTmpFiles ¶
func CallNodesMergeTmpFiles(receiverAddress []string)
func DeleteFile ¶
func DeleteSDFSDir ¶
func GetMillisecond ¶
func GetMillisecond() int
func IsInCircleRange ¶
func ListFileInSDFSDir ¶
func NormalCharToSpecial ¶
func PutFile ¶
func PutFile(address string, args *StoreFileArgs, c chan int)
func ReadContent ¶
func ReplyTaskResultToDcli ¶
func ReplyTaskResultToDcli(message, clientAddress string)
func SpecialCharToNormal ¶
func WriteJuicePairToLocal ¶
Types ¶
type ActionType ¶
type ActionType int8
type FileList ¶
type FileList struct { ID int FileMap map[string]*FileInfo // Key: sdfsfilename, value: fileinfo ListLock *sync.Mutex }
func CreateFileList ¶
func (*FileList) AppendFile ¶
func (*FileList) DeleteFileAndInfo ¶
func (*FileList) DeleteFileInfo ¶
func (*FileList) DeleteFileInfosOutOfRange ¶
func (*FileList) DeleteSDFSDir ¶
func (*FileList) DeleteTmpFilesFromFailedWorker ¶
func (*FileList) GetFileInfo ¶
func (*FileList) GetFilesInRange ¶
func (*FileList) GetOwnedFileInfos ¶
func (*FileList) GetTimeStamp ¶
func (*FileList) ListFileInDir ¶
func (*FileList) ListFilesWithPrefix ¶
func (*FileList) MergeDirectoryWithSurfix ¶
func (*FileList) MergeTmpFiles ¶
func (*FileList) PutFileInfo ¶
func (*FileList) PutFileInfoBase ¶
func (*FileList) PutFileInfoObject ¶
PutFileInfoObject is used For testing
func (*FileList) StoreFileBase ¶
func (fl *FileList) StoreFileBase( hashId int, sdfsName string, root_dir string, timestamp int, masterNodeID int, data []byte, appending bool, tmp bool) error
This should only be used in test
func (*FileList) StoreTmpFile ¶
type FileService ¶
type FileService struct {
// contains filtered or unexported fields
}
func (*FileService) CheckFileExists ¶
func (fileService *FileService) CheckFileExists(sdfsfilename string, hostname *string) error
func (*FileService) DeleteFileRequest ¶
func (fileService *FileService) DeleteFileRequest(sdfsName string, result *RPCResultType) error
func (*FileService) DeleteLocalFile ¶
func (fileService *FileService) DeleteLocalFile(sdfsName string, result *RPCResultType) error
func (*FileService) DeleteSDFSDir ¶
func (fileService *FileService) DeleteSDFSDir(dir string, result *RPCResultType) error
func (*FileService) DeleteSDFSDirRequest ¶
func (fileService *FileService) DeleteSDFSDirRequest(sdfsdir string, result *RPCResultType) error
func (*FileService) GetFileRequest ¶
func (fileService *FileService) GetFileRequest(args []string, result *RPCResultType) error
Executed in coordinator
func (*FileService) GetTimeStamp ¶
func (fileService *FileService) GetTimeStamp(sdfsFileName string, timestamp *int) error
func (*FileService) ListFileInDirRequest ¶
func (fileService *FileService) ListFileInDirRequest(sdfsDir string, res *[]string) error
func (*FileService) ListFileInLocalDir ¶
func (fileService *FileService) ListFileInLocalDir(dir string, result *[]string) error
func (*FileService) ListFilesWithPrefix ¶
func (fileService *FileService) ListFilesWithPrefix(prefix string, result *[]string) error
func (*FileService) Ls ¶
func (fileService *FileService) Ls(sdfsfilename string, hostnames *[]string) error
func (*FileService) PutFileRequest ¶
func (fileService *FileService) PutFileRequest(args *PutFileArgs, result *RPCResultType) error
Callee begin
func (*FileService) ServeLocalFile ¶
func (fileService *FileService) ServeLocalFile(sdfsfilename string, result *[]byte) error
func (*FileService) StoreFileToLocal ¶
func (fileService *FileService) StoreFileToLocal(args *StoreFileArgs, result *RPCResultType) error
type MapleJuiceService ¶
type MapleJuiceService struct { TaskQueue chan *MapleJuiceTaskArgs SelfNode *Node }
func (*MapleJuiceService) AddMapleJuiceTask ¶
func (mj *MapleJuiceService) AddMapleJuiceTask(args *MapleJuiceTaskArgs, result *RPCResultType) error
add maple juice task to queue
func (*MapleJuiceService) ForwardMapleJuiceRequest ¶
func (mj *MapleJuiceService) ForwardMapleJuiceRequest(args *MapleJuiceTaskArgs, result *RPCResultType) error
handle maple/juice request from Dcli send request to Master
func (*MapleJuiceService) MergeTmpFiles ¶
func (mj *MapleJuiceService) MergeTmpFiles(ts int, result *RPCResultType) error
func (*MapleJuiceService) StartMapleJuiceTask ¶
func (mj *MapleJuiceService) StartMapleJuiceTask(des *TaskDescription, result *RPCResultType) error
****
- Worker ****
type MapleJuiceTaskArgs ¶
type MapleJuiceTaskArgs struct { TaskType MapleJuiceTaskType // "MapleTask" or "JuiceTask" Exe string NumWorkers int InputPath string // sdfs_src_dir for maple, prefix for juice OutputPath string // prefix for maple, sdfs_dest_filename for juice ClientAddr string DeleteInput bool }
MapleJuiceTaskArgs ...
type MapleJuiceTaskType ¶
type MapleJuiceTaskType int8
const ( MapleTask MapleJuiceTaskType = 1 JuiceTask MapleJuiceTaskType = 2 )
Task names
type MemberList ¶
type MemberList struct { Member_map map[int]*MemberNode Capacity, Size int SelfId int // contains filtered or unexported fields }
func ConstructFromTmpFile ¶
func ConstructFromTmpFile() *MemberList
func CreateMemberList ¶
func CreateMemberList(selfId, capacity int) *MemberList
func (*MemberList) DeleteNode ¶
func (mbList *MemberList) DeleteNode(id int)
func (*MemberList) DumpToTmpFile ¶
func (mbList *MemberList) DumpToTmpFile()
func (*MemberList) FindLeastFreeId ¶
func (mbList *MemberList) FindLeastFreeId() int
func (*MemberList) GetAddress ¶
func (mbList *MemberList) GetAddress(id int) string
func (*MemberList) GetAllAddressesExcludeSelf ¶
func (mbList *MemberList) GetAllAddressesExcludeSelf() []string
func (*MemberList) GetAllRPCAddresses ¶
func (mbList *MemberList) GetAllRPCAddresses() []string
func (*MemberList) GetIP ¶
func (mbList *MemberList) GetIP(id int) string
func (*MemberList) GetNextKNodes ¶
func (mbList *MemberList) GetNextKNodes(id, k int) []MemberNode
func (*MemberList) GetNode ¶
func (mbList *MemberList) GetNode(id int) *MemberNode
func (*MemberList) GetPrevKNodes ¶
func (mbList *MemberList) GetPrevKNodes(id, k int) []MemberNode
func (*MemberList) GetRPCAddress ¶
func (mbList *MemberList) GetRPCAddress(id int) string
func (*MemberList) GetRPCAddressesForNextKNodes ¶
func (mbList *MemberList) GetRPCAddressesForNextKNodes(start, k int) []string
func (*MemberList) GetSmallestNode ¶
func (mbList *MemberList) GetSmallestNode() *MemberNode
func (*MemberList) GetTimeOutNodes ¶
func (mbList *MemberList) GetTimeOutNodes(deadline, id, k int) []MemberNode
func (*MemberList) InsertNode ¶
func (mbList *MemberList) InsertNode(id int, ip, port, rpc_port string, heartbeat_t int, hostname string)
func (*MemberList) NicePrint ¶
func (mbList *MemberList) NicePrint()
func (*MemberList) NodeTimeOut ¶
func (mbList *MemberList) NodeTimeOut(deadline, id int) bool
*** this is for passive monitoring
func (*MemberList) ToJson ¶
func (mbList *MemberList) ToJson() []byte
func (*MemberList) UpdateNodeHeartbeat ¶
func (mbList *MemberList) UpdateNodeHeartbeat(id, heartbeat_t int)
type MemberNode ¶
type MemberNode struct { Id int Heartbeat_t int JoinTime string Hostname string Ip string Port string RPC_Port string // contains filtered or unexported fields }
func CreateMemberNode ¶
func CreateMemberNode(id int, ip, port, rpc_port string, heartbeat_t int, hostname string) *MemberNode
func (*MemberNode) GetNextNode ¶
func (mNode *MemberNode) GetNextNode() *MemberNode
func (*MemberNode) GetPrevNode ¶
func (mNode *MemberNode) GetPrevNode() *MemberNode
type Node ¶
type Node struct { Id int IP, Port, RPC_Port string MbList *MemberList FileList *FileList Root_dir string Hostname string DisableMonitorHB bool // Disalbe monitor heartbeat, for test FailureNodeChan chan int // contains filtered or unexported fields }
func CreateNode ¶
func (*Node) DeleteRedundantFile ¶
func (node *Node) DeleteRedundantFile()
func (*Node) DeleteSDFSDirRequest ¶
func (*Node) DuplicateReplica ¶
func (node *Node) DuplicateReplica()
func (*Node) GetAddressOfLatestTS ¶
func (*Node) GetAddressesWithIds ¶
func (*Node) GetFileRequest ¶
func (node *Node) GetFileRequest(args []string, result *RPCResultType) error
func (*Node) GetFilesFromSDFS ¶
func (*Node) GetFirstKReplicaNodeID ¶
func (*Node) GetMasterID ¶
func (*Node) GetResponsibleAddresses ¶
func (*Node) GetResponsibleHostname ¶
func (*Node) HandleJuiceTask ¶
func (*Node) HandleMapleTask ¶
func (*Node) HandleTCPFileRequest ¶
func (*Node) IndividualPutFileRequest ¶
func (node *Node) IndividualPutFileRequest(sdfsName, localName string, forceUpdate, appending, tmp bool, result *RPCResultType) error
func (*Node) InitMemberList ¶
func (node *Node) InitMemberList()
func (*Node) ListFileInDirRequest ¶
func (*Node) ListFilesWithPrefixRequest ¶
func (*Node) MonitorInputPacket ¶
func (node *Node) MonitorInputPacket()
func (*Node) PartitionFiles ¶
func (*Node) PutFileRequest ¶
func (node *Node) PutFileRequest(args *PutFileArgs, result *RPCResultType) error
func (*Node) RegisterFileService ¶
func (*Node) RegisterMapleJuiceService ¶
func (node *Node) RegisterMapleJuiceService(address string, mjService *MapleJuiceService) error
func (*Node) RegisterRPCMapleJuiceService ¶
func (node *Node) RegisterRPCMapleJuiceService()
func (*Node) SendFileIfNecessary ¶
func (*Node) SendHeartbeat ¶
func (node *Node) SendHeartbeat()
func (*Node) SendHeartbeatRoutine ¶
func (node *Node) SendHeartbeatRoutine()
func (*Node) SetFileDir ¶
func (*Node) StartMapleJuiceTask ¶
func (node *Node) StartMapleJuiceTask(des *TaskDescription) error
func (*Node) StartRPCService ¶
func (node *Node) StartRPCService()
func (*Node) StartTCPService ¶
func (node *Node) StartTCPService()
func (*Node) StoreFileToLocal ¶
func (node *Node) StoreFileToLocal(args *StoreFileArgs) error
func (*Node) TransferOwnership ¶
func (*Node) UpdateHostname ¶
type Packet ¶
type Packet struct { Action ActionType Id int Hostname string IP string Port string RPC_Port string Map *MemberList }
type PutFileArgs ¶
type RPCResultType ¶
type RPCResultType int8
const ( RPC_SUCCESS RPCResultType = 1 << 0 RPC_DUMMY RPCResultType = 1 << 1 RPC_FAIL RPCResultType = 1 << 2 RPC_PROMPT RPCResultType = 1 << 3 FILES_ROOT_DIR = "/apps/files" )
type StatusType ¶
type StatusType int8
type StoreFileArgs ¶
type StoreFileArgs struct { MasterNodeId int SdfsName string Ts int Content []byte Appending bool Tmp bool }
func ParsePutArgs ¶
func ParsePutArgs(lines []string) (*StoreFileArgs, error)
type TaskDescription ¶
Click to show internal directories.
Click to hide internal directories.