Go IPFS MapReduce
A simple POC Map Reduce Library for IPFS in Golang
Design
This acts as a standalone library. So only IPFS nodes using this library to register services will be able to communicate to each other to perform MapReduce.
In the future the plan it to have this part of the daemon so all IPFS nodes have this protocol and we can do p2p map reduce at a much larger scale. Because IPFS slits a file added to it into 256 KB blocks, we can independently process them.
Every Peer (a IPFS node) registers the map & reduce gorpc services using the library. This sets the required stream handlers for the libp2p protocol "/ipfs/mapreduce".
To run mapreduce, we use the library to get the master struct, registering the master service and initializing by passing in the required variables. Files and stored and fetched from IPFS via the Cid indentifier.
- node: the ipfs node used to connect to peers, fetch files, etc.
- mapFuncFilePath: map golang code built to a ".so" file in plugin mode
- reduceFuncFilePath: reduce golang code built to a ".so" file in plugin mode
- noOfReducers: no of reducers
- dataFileCid: cid string for the data file to process using map reduce.
- mrOutputFile: file name where final output should be stored
Calling a run method on the master starts the map reduce process.
Usage
All Peers
import (
...
mapreduce "github.com/omkarprabhu-98/go-ipfs-mapreduce"
...
)
fmt.Println("Spawning ephemeral ipfs node")
node, err := spawnEphemeral(ctx)
if err != nil {
panic(fmt.Errorf("failed to spawn ephemeral node: %s", err))
}
err = mapreduce.RegisterProtocol(node)
if err != nil {
panic(fmt.Errorf("failed to register map reduce protocol: %s", err))
}
Run Map Reduce
master, err := mapreduce.InitMaster(node, mapFuncFilePath, reduceFuncFilePath,
noOfReducers, dataFileCid);
if err != nil {
panic(fmt.Errorf("failed to init master: %s", err))
}
master.RunMapReduce(ctx)
Observe status
ticker := time.NewTicker(5 * time.Second)
quit := make(chan struct{})
go func() {
for {
select {
case <- ticker.C:
fmt.Println("MapStatus:", master.GetMapStatus())
redStatus := master.GetReduceStatus()
fmt.Println("ReduceStatus:", redStatus)
if redStatus.Complete == redStatus.Total {
quit <- struct{}{}
}
case <- quit:
ticker.Stop()
return
}
}
}()
Check examples
directory for examples
Demo
Snippets of sample runs locally
- Small input file 1KB
https://user-images.githubusercontent.com/23053768/129325774-5017407f-edbf-4227-a362-26d0d3e4a241.mov
- Large file 581 KB
https://user-images.githubusercontent.com/23053768/129325623-03e7be66-99ef-4534-9e8d-f7f9cd9d5b0e.mov
References
Inspiration and code references are taken from MIT's 6.824 course
TODOs for the project
- Handle errors and timeouts in the current code
- Write TF-IDF mapreduce for Hadoop testing (may have to be in Java) and get stats
- Write TF-IDF in Go for our use case
- Script to spawn up the cluster and distribute the data in CloudLab nodes. Also test the TF-IDF for our framework.
- TODO in master.go for choosing which peer to get the block from
- TODO in master.go to avoid locking the whole computation
- Better way to handle TODO in master.go line 94
- (can avoid for now) Weird error in utils.go TODO, tmp fix in place, find a better fix.
- testing
- integrate into go-ifs