Documentation ¶
Overview ¶
Example (Blserver) ¶
server := startMockServer("127.0.0.1:4000") defer func() { // Shutdown the server after test if err := server.Shutdown(context.Background()); err != nil { panic(err) // Handle the error as you see fit } }() const poolName = "1" ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) //ctx = network.WithUseTransient(ctx, "fx.exchange") defer cancel() // Elevate log level to show internal communications. if err := logging.SetLogLevel("*", "info"); err != nil { panic(err) } // Use a deterministic random generator to generate deterministic // output for the example. cfg := NewConfig() // ListenAddr configure ListenAddrsConfig1 := []string{"/ip4/0.0.0.0/tcp/40001", "/ip4/0.0.0.0/udp/40001/quic", "/ip4/0.0.0.0/udp/40001/quic-v1", "/ip4/0.0.0.0/udp/40001/quic-v1/webtransport"} listenAddrs1 := make([]multiaddr.Multiaddr, 0, len(ListenAddrsConfig1)+1) // Convert string addresses to multiaddr and append to listenAddrs for _, addrString := range ListenAddrsConfig1 { addr, err := multiaddr.NewMultiaddr(addrString) if err != nil { panic(fmt.Errorf("invalid multiaddress: %w", err)) } listenAddrs1 = append(listenAddrs1, addr) } // Add the relay multiaddress relayAddr2, err := multiaddr.NewMultiaddr("/p2p-circuit") if err != nil { panic(fmt.Errorf("error creating relay multiaddress: %w", err)) } listenAddrs1 = append(listenAddrs1, relayAddr2) /// ListenAddrsConfig2 := []string{"/ip4/0.0.0.0/tcp/40002", "/ip4/0.0.0.0/udp/40002/quic"} listenAddrs2 := make([]multiaddr.Multiaddr, 0, len(ListenAddrsConfig2)+1) // Convert string addresses to multiaddr and append to listenAddrs for _, addrString := range ListenAddrsConfig2 { addr, err := multiaddr.NewMultiaddr(addrString) if err != nil { panic(fmt.Errorf("invalid multiaddress: %w", err)) } listenAddrs2 = append(listenAddrs2, addr) } // Add the relay multiaddress listenAddrs2 = append(listenAddrs2, relayAddr2) /// ListenAddrsConfig3 := []string{"/ip4/0.0.0.0/tcp/40003", "/ip4/0.0.0.0/udp/40003/quic"} listenAddrs3 := make([]multiaddr.Multiaddr, 0, len(ListenAddrsConfig3)+1) // Convert string addresses to multiaddr and append to listenAddrs for _, addrString := range ListenAddrsConfig3 { addr, err := multiaddr.NewMultiaddr(addrString) if err != nil { panic(fmt.Errorf("invalid multiaddress: %w", err)) } listenAddrs3 = append(listenAddrs3, addr) } // Add the relay multiaddress listenAddrs3 = append(listenAddrs3, relayAddr2) /// ListenAddrsConfig4 := []string{"/ip4/0.0.0.0/tcp/40004", "/ip4/0.0.0.0/udp/40004/quic"} listenAddrs4 := make([]multiaddr.Multiaddr, 0, len(ListenAddrsConfig4)+1) // Convert string addresses to multiaddr and append to listenAddrs for _, addrString := range ListenAddrsConfig4 { addr, err := multiaddr.NewMultiaddr(addrString) if err != nil { panic(fmt.Errorf("invalid multiaddress: %w", err)) } listenAddrs4 = append(listenAddrs4, addr) } // Add the relay multiaddress listenAddrs4 = append(listenAddrs4, relayAddr2) //End of ListenAddr configure hopts := []libp2p.Option{ //libp2p.ListenAddrs(listenAddrs...), libp2p.EnableNATService(), libp2p.NATPortMap(), libp2p.EnableRelay(), libp2p.EnableHolePunching(), } sr := make([]peer.AddrInfo, 0, len(cfg.StaticRelays)) for _, relay := range cfg.StaticRelays { rma, err := multiaddr.NewMultiaddr(relay) if err != nil { fmt.Println(err) } rai, err := peer.AddrInfoFromP2pAddr(rma) if err != nil { fmt.Println(err) } sr = append(sr, *rai) } libp2p.EnableAutoRelayWithStaticRelays(sr, autorelay.WithNumRelays(1)) hopts = append(hopts, libp2p.ForceReachabilityPrivate()) // Instantiate the first node in the pool hopts1 := append(hopts, libp2p.Identity(generateIdentity(1))) hopts1 = append(hopts1, libp2p.ListenAddrs(listenAddrs1...)) h1, err := libp2p.New(hopts1...) if err != nil { panic(err) } n1, err := blox.New( blox.WithPoolName(poolName), blox.WithTopicName(poolName), blox.WithHost(h1), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{devRelay}), blox.WithPingCount(5), blox.WithMaxPingTime(100), blox.WithMinSuccessPingRate(80), ) if err != nil { panic(err) } if err := n1.Start(ctx); err != nil { panic(err) } defer n1.Shutdown(ctx) log.Debugf("n1 Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) // Instantiate the second node in the pool hopts2 := append(hopts, libp2p.Identity(generateIdentity(2))) hopts2 = append(hopts2, libp2p.ListenAddrs(listenAddrs2...)) h2, err := libp2p.New(hopts2...) if err != nil { panic(err) } n2, err := blox.New( blox.WithPoolName(poolName), blox.WithTopicName(poolName), blox.WithHost(h2), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{devRelay}), blox.WithPingCount(5), blox.WithMaxPingTime(100), blox.WithMinSuccessPingRate(80), ) if err != nil { panic(err) } if err := n2.Start(ctx); err != nil { panic(err) } defer n2.Shutdown(ctx) log.Debugf("n2 Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) // Instantiate the third node in the pool hopts3 := append(hopts, libp2p.Identity(generateIdentity(3))) hopts3 = append(hopts3, libp2p.ListenAddrs(listenAddrs3...)) h3, err := libp2p.New(hopts3...) if err != nil { panic(err) } n3, err := blox.New( blox.WithPoolName(poolName), blox.WithTopicName(poolName), blox.WithHost(h3), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{devRelay}), blox.WithPingCount(5), blox.WithMaxPingTime(100), blox.WithMinSuccessPingRate(80), ) if err != nil { panic(err) } if err := n3.Start(ctx); err != nil { panic(err) } defer n3.Shutdown(ctx) log.Debugf("n3 Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { panic(err) } if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { panic(err) } // Wait until the nodes discover each other for { if len(h1.Peerstore().Peers()) >= 3 && len(h2.Peerstore().Peers()) >= 3 && len(h3.Peerstore().Peers()) >= 3 { break } else { h1Peers := h1.Peerstore().Peers() log.Debugf("n1 Only %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) } select { case <-ctx.Done(): panic(ctx.Err()) default: time.Sleep(time.Second) } } h1Peers := h1.Peerstore().Peers() log.Debugf("n1 Finally %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) for _, id := range h1Peers { log.Debugf("- %s\n", id) } h2Peers := h2.Peerstore().Peers() log.Debugf("n2 Finally %s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers)) for _, id := range h2Peers { log.Debugf("- %s\n", id) } h3Peers := h3.Peerstore().Peers() log.Debugf("n3 Finally %s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers)) for _, id := range h3Peers { log.Debugf("- %s\n", id) } // Instantiate the fourth node not in the pool log.Debug("Now creating pid of n4") log.Debug("Now creating host of n4") hopts4 := append(hopts, libp2p.Identity(generateIdentity(4))) hopts4 = append(hopts4, libp2p.ListenAddrs(listenAddrs4...)) h4, err := libp2p.New(hopts4...) if err != nil { log.Errorw("An error happened in creating libp2p instance of n4", "Err", err) panic(err) } log.Debug("Now creating blox for n4") n4, err := blox.New( blox.WithPoolName(poolName), blox.WithTopicName(poolName), blox.WithHost(h4), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{devRelay}), blox.WithPingCount(5), blox.WithMaxPingTime(100), blox.WithMinSuccessPingRate(80), ) if err != nil { log.Errorw("An error happened in creating blox instance of n4", "Err", err) panic(err) } log.Debug("Now starting n4") if err := n4.Start(ctx); err != nil { log.Errorw("An error happened in starting instance of n4", "Err", err) panic(err) } defer n4.Shutdown(ctx) log.Debugf("n4 Instantiated node in pool %s with ID: %s\n", poolName, h4.ID().String()) n4.AnnounceJoinPoolRequestPeriodically(ctx) if err = h1.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { panic(err) } if err = h2.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { panic(err) } if err = h3.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { panic(err) } // Wait until the fourth node discover others for { members := n4.GetBlMembers() for id, status := range members { memberInfo := fmt.Sprintf("Member ID: %s, Status: %v", id.String(), status) log.Debugln(memberInfo) } if len(members) >= 2 { break } else { log.Debugln(members) } select { case <-ctx.Done(): panic(ctx.Err()) default: time.Sleep(2 * time.Second) } } h4Peers := h4.Peerstore().Peers() log.Debugf("n4 Finally %s peerstore contains %d nodes:\n", h4.ID(), len(h4Peers)) for _, id := range h4Peers { log.Debugf("- %s\n", id) } //wait for 60 seconds count := 1 for { count = count + 1 if count > 60 { break } select { case <-ctx.Done(): panic(ctx.Err()) default: time.Sleep(1 * time.Second) } }
Output: Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true} Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true} Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true}
Example (Encode64Test) ¶
package main import ( "encoding/base64" "fmt" ) func main() { originalString := "12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX" // Encode to Base64 encodedString := base64.StdEncoding.EncodeToString([]byte(originalString)) fmt.Println("Encoded:", encodedString) // Decode from Base64 decodedBytes, err := base64.StdEncoding.DecodeString(encodedString) if err != nil { fmt.Println("Decode error:", err) return } decodedString := string(decodedBytes) fmt.Println("Decoded:", decodedString) // Check if original and decoded are the same if originalString == decodedString { fmt.Println("Success: Original and decoded strings are the same.") } else { fmt.Println("Error: Original and decoded strings are different.") } }
Output: Encoded: MTJEM0tvb1dIOXN3amVDeXVSNnV0ektVMVVzcGlXNVJER3pBRnZORHdxa1Q1YlVId3V4WA== Decoded: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX Success: Original and decoded strings are the same.
Example (Pingtest) ¶
averageDuration := float64(2000) successCount := 0 server := startMockServer("127.0.0.1:4000") defer func() { // Shutdown the server after test if err := server.Shutdown(context.Background()); err != nil { panic(err) // Handle the error as you see fit } }() const poolName = "1" ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) defer cancel() // Elevate log level to show internal communications. if err := logging.SetLogLevel("*", "debug"); err != nil { panic(err) } nodeMultiAddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5001") if err != nil { panic(fmt.Errorf("invalid multiaddress: %w", err)) } node, err := rpc.NewApi(nodeMultiAddr) // Use a deterministic random generator to generate deterministic // output for the example. h1, err := libp2p.New(libp2p.Identity(generateIdentity(1))) if err != nil { panic(err) } n1, err := blox.New( blox.WithPoolName(poolName), blox.WithTopicName(poolName), blox.WithHost(h1), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), blox.WithPingCount(5), blox.WithIpfsClient(node), blox.WithExchangeOpts( exchange.WithDhtProviderOptions( dht.ProtocolExtension(protocol.ID("/"+poolName)), dht.ProtocolPrefix("/fula"), dht.Resiliency(1), dht.Mode(dht.ModeAutoServer), ), ), ) if err != nil { panic(err) } if err := n1.Start(ctx); err != nil { panic(err) } defer n1.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) PingCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() // Send the ping request n1.GetBlMembers() rpc := n1.GetIPFSRPC() res, err := rpc.Request("ping", "12D3KooWHb38UxY8akVGWZBuFtS3NJ7rJUwd36t3cfkoY7EbgNt9").Send(PingCtx) if err != nil { fmt.Printf("ping was unsuccessful: %s", err) return } if res.Error != nil { fmt.Printf("ping was unsuccessful: %s", res.Error) return } // Process multiple responses using a decoder decoder := json.NewDecoder(res.Output) var totalTime int64 var count int for decoder.More() { var pingResp PingResponse err := decoder.Decode(&pingResp) if err != nil { log.Errorf("error decoding JSON response: %s", err) continue } if pingResp.Text == "" && pingResp.Time > 0 { // Check for empty Text field and Time totalTime += pingResp.Time count++ } } if count > 0 { averageDuration = float64(totalTime) / float64(count) / 1e6 // Convert nanoseconds to milliseconds successCount = count } else { fmt.Println("No valid ping responses received") return } if int(averageDuration) > 1 { fmt.Printf("Final response: successCount=%d", successCount) }
Output: Final response: successCount=10
Example (PoolDiscoverPeersViaPubSub) ¶
Example_poolDiscoverPeersViaPubSub starts a pool named "1" across three nodes, connects two of the nodes to the other one to facilitate a path for pubsub to propagate and shows all three nodes discover each other using pubsub.
server := startMockServer("127.0.0.1:4000") defer func() { // Shutdown the server after test if err := server.Shutdown(context.Background()); err != nil { panic(err) // Handle the error as you see fit } }() const poolName = "1" ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() // Elevate log level to show internal communications. if err := logging.SetLogLevel("*", "info"); err != nil { panic(err) } // Use a deterministic random generator to generate deterministic // output for the example. identity1 := libp2p.Identity(generateIdentity(1)) h1, err := libp2p.New(identity1) if err != nil { panic(err) } log.Infow("h1 value generated", "h1", h1.ID()) //12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1)) if err != nil { panic(err) } if err := n1.Start(ctx); err != nil { panic(err) } defer n1.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) identity2 := libp2p.Identity(generateIdentity(2)) h2, err := libp2p.New(identity2) if err != nil { panic(err) } log.Infow("h2 value generated", "h2", h2.ID()) //12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX n2, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h2)) if err != nil { panic(err) } if err := n2.Start(ctx); err != nil { panic(err) } defer n2.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) identity3 := libp2p.Identity(generateIdentity(3)) h3, err := libp2p.New(identity3) if err != nil { panic(err) } log.Infow("h3 value generated", "h3", h3.ID()) //12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX if err != nil { panic(err) } n3, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h3)) if err != nil { panic(err) } if err := n3.Start(ctx); err != nil { panic(err) } defer n3.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) // Connect n1 to n2 and n3 so that there is a path for gossip propagation. // Note that we are not connecting n2 to n3 as they should discover // each other via pool's iexist announcements. h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { panic(err) } h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { panic(err) } // Wait until the nodes discover each other for { if len(h1.Peerstore().Peers()) == 4 && len(h2.Peerstore().Peers()) == 4 && len(h3.Peerstore().Peers()) == 4 { break } else { log.Infow("h1.Peerstore().Peers() is waitting", "h1.Peerstore().Peers()", h1.Peerstore().Peers()) } select { case <-ctx.Done(): panic(ctx.Err()) default: time.Sleep(time.Second) } } h1Peers := h1.Peerstore().Peers() fmt.Printf("%s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) for _, id := range h1Peers { fmt.Printf("- %s\n", id) } h2Peers := h2.Peerstore().Peers() fmt.Printf("%s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers)) for _, id := range h2Peers { fmt.Printf("- %s\n", id) } h3Peers := h3.Peerstore().Peers() fmt.Printf("%s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers)) for _, id := range h2Peers { fmt.Printf("- %s\n", id) }
Output: Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM Instantiated node in pool 1 with ID: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX Instantiated node in pool 1 with ID: 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM peerstore contains 4 nodes: - 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX - 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX - 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM - 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX peerstore contains 4 nodes: - 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX - 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM - 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX - 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX peerstore contains 4 nodes: - 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX - 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM - 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX - 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R
Example (StoreManifest) ¶
server := startMockServer("127.0.0.1:4000") defer func() { // Shutdown the server after test if err := server.Shutdown(context.Background()); err != nil { panic(err) // Handle the error as you see fit } }() const poolName = "1" ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) defer cancel() // Elevate log level to show internal communications. if err := logging.SetLogLevel("*", "debug"); err != nil { panic(err) } // Use a deterministic random generator to generate deterministic // output for the example. h1, err := libp2p.New(libp2p.Identity(generateIdentity(1))) if err != nil { panic(err) } n1, err := blox.New( blox.WithPoolName(poolName), blox.WithTopicName(poolName), blox.WithHost(h1), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), blox.WithPingCount(5), blox.WithExchangeOpts( exchange.WithDhtProviderOptions( dht.ProtocolExtension(protocol.ID("/"+poolName)), dht.ProtocolPrefix("/fula"), dht.Resiliency(1), dht.Mode(dht.ModeAutoServer), ), ), ) if err != nil { panic(err) } if err := n1.Start(ctx); err != nil { panic(err) } defer n1.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) h2, err := libp2p.New(libp2p.Identity(generateIdentity(2))) if err != nil { panic(err) } n2, err := blox.New( blox.WithPoolName(poolName), blox.WithTopicName(poolName), blox.WithHost(h2), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), blox.WithPingCount(5), blox.WithExchangeOpts( exchange.WithDhtProviderOptions( dht.ProtocolExtension(protocol.ID("/"+poolName)), dht.ProtocolPrefix("/fula"), dht.Resiliency(1), dht.Mode(dht.ModeAutoServer), ), ), ) if err != nil { panic(err) } if err := n2.Start(ctx); err != nil { panic(err) } defer n2.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) // Instantiate the third node in the pool h3, err := libp2p.New(libp2p.Identity(generateIdentity(3))) if err != nil { panic(err) } n3, err := blox.New( blox.WithPoolName(poolName), blox.WithTopicName(poolName), blox.WithHost(h3), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), blox.WithPingCount(5), blox.WithExchangeOpts( exchange.WithDhtProviderOptions( dht.ProtocolExtension(protocol.ID("/"+poolName)), dht.ProtocolPrefix("/fula"), dht.Resiliency(1), dht.Mode(dht.ModeAutoServer), ), exchange.WithIpniGetEndPoint("http://127.0.0.1:4000/cid/"), ), ) if err != nil { panic(err) } if err := n3.Start(ctx); err != nil { panic(err) } defer n3.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) // Instantiate the third node in the pool h4, err := libp2p.New(libp2p.Identity(generateIdentity(4))) if err != nil { panic(err) } n4, err := blox.New( blox.WithPoolName(poolName), blox.WithTopicName(poolName), blox.WithHost(h4), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), blox.WithPingCount(5), blox.WithExchangeOpts( exchange.WithDhtProviderOptions( dht.ProtocolExtension(protocol.ID("/"+poolName)), dht.ProtocolPrefix("/fula"), dht.Resiliency(1), dht.Mode(dht.ModeAutoServer), ), exchange.WithIpniGetEndPoint("http://127.0.0.1:4000/cid/"), ), ) if err != nil { panic(err) } if err := n4.Start(ctx); err != nil { panic(err) } defer n4.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h4.ID().String()) // Connect n1 to n2. h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { panic(err) } h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) if err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { panic(err) } h3.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) if err = h3.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { panic(err) } h4.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) if err = h4.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { panic(err) } for { if len(h1.Peerstore().Peers()) >= 3 && len(h2.Peerstore().Peers()) >= 3 && len(h3.Peerstore().Peers()) >= 3 && len(h4.Peerstore().Peers()) >= 3 { break } select { case <-ctx.Done(): panic(ctx.Err()) default: time.Sleep(time.Second) } } // Authorize exchange between the two nodes if err := n1.SetAuth(ctx, h1.ID(), h2.ID(), true); err != nil { panic(err) } if err := n2.SetAuth(ctx, h2.ID(), h1.ID(), true); err != nil { panic(err) } // Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1 n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { na.AssembleEntry("this").AssignBool(true) }) n1leafLink, err := n1.Store(ctx, n1leaf) if err != nil { panic(err) } n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { na.AssembleEntry("that").AssignInt(42) na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink) }) n1RootLink, err := n1.Store(ctx, n1Root) if err != nil { panic(err) } fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) // Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1 n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { na.AssembleEntry("that").AssignBool(false) }) n2leafLink, err := n2.Store(ctx, n2leaf) if err != nil { panic(err) } n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { na.AssembleEntry("this").AssignInt(24) na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink) }) n2RootLink, err := n2.Store(ctx, n2Root) if err != nil { panic(err) } fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) fmt.Println("exchanging by Pull...") // Pull the sample DAG stored on node 1 from node 2 by only asking for the root link. // Because fetch implementation is recursive, it should fetch the leaf link too. if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil { panic(err) } // Assert that n2 now has both root and leaf links if exists, err := n2.Has(ctx, n1RootLink); err != nil { panic(err) } else if !exists { panic("expected n2 to have fetched the entire sample DAG") } else { fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1RootLink, h1.ID()) n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any) if err != nil { panic(err) } var buf bytes.Buffer if err := dagjson.Encode(n, &buf); err != nil { panic(err) } fmt.Printf(" content: %s\n", buf.String()) } if exists, err := n2.Has(ctx, n1leafLink); err != nil { panic(err) } else if !exists { panic("expected n2 to have fetched the entire sample DAG") } else { fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1leafLink, h1.ID()) n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any) if err != nil { panic(err) } var buf bytes.Buffer if err := dagjson.Encode(n, &buf); err != nil { panic(err) } fmt.Printf(" content: %s\n", buf.String()) } fmt.Println("exchanging by Push...") // Push the sample DAG stored on node 2 to node 1 by only pushing the root link. // Because Push implementation is recursive, it should push the leaf link too. if err := n2.Push(ctx, h1.ID(), n2RootLink); err != nil { panic(err) } // Since push is an asynchronous operation, wait until background push is finished // by periodically checking if link is present on node 1. for { if exists, _ := n1.Has(ctx, n2RootLink); exists { break } select { case <-ctx.Done(): panic(ctx.Err()) default: time.Sleep(time.Second) } } // Assert that n1 now has both root and leaf links if exists, err := n1.Has(ctx, n2RootLink); err != nil { panic(err) } else if !exists { panic("expected n2 to have pushed the entire sample DAG") } else { fmt.Printf("%s successfully pushed:\n link: %s\n from %s\n", h2.ID(), n1RootLink, h1.ID()) n, err := n1.Load(ctx, n2RootLink, basicnode.Prototype.Any) if err != nil { panic(err) } var buf bytes.Buffer if err := dagjson.Encode(n, &buf); err != nil { panic(err) } fmt.Printf(" content: %s\n", buf.String()) } if exists, err := n1.Has(ctx, n2leafLink); err != nil { panic(err) } else if !exists { panic("expected n2 to have pushed the entire sample DAG") } else { fmt.Printf("%s successfully pushed:\n link: %s\n from %s\n", h2.ID(), n1leafLink, h1.ID()) n, err := n1.Load(ctx, n2leafLink, basicnode.Prototype.Any) if err != nil { panic(err) } var buf bytes.Buffer if err := dagjson.Encode(n, &buf); err != nil { panic(err) } fmt.Printf(" content: %s\n", buf.String()) } err = n1.ProvideLinkByDht(n2leafLink) if err != nil { fmt.Print("Error happened in ProvideLinkByDht") panic(err) } peerlist3, err := n3.FindLinkProvidersByDht(n2leafLink) if err != nil { fmt.Print("Error happened in FindLinkProvidersByDht3") panic(err) } // Iterate over the slice and print the peer ID of each AddrInfo for _, addrInfo := range peerlist3 { fmt.Printf("Found %s on %s\n", n2leafLink, addrInfo.ID.String()) // ID.String() converts the peer ID to a string } n3.FetchAvailableManifestsAndStore(ctx, 2) time.Sleep(1 * time.Second) n4.FetchAvailableManifestsAndStore(ctx, 2)
Output: Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM Instantiated node in pool 1 with ID: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX Instantiated node in pool 1 with ID: 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX Instantiated node in pool 1 with ID: 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM stored IPLD data with links: root: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji leaf:bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM stored IPLD data with links: root: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji leaf:bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4 exchanging by Pull... 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully fetched: link: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM content: {"oneLeafLink":{"/":"bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4"},"that":42} 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully fetched: link: bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4 from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM content: {"this":true} exchanging by Push... 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully pushed: link: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM content: {"anotherLeafLink":{"/":"bafyr4iaab3lel4ykjcyzqajx5np2uluetwvfyv3ujupxt5qs57owhpo6ty"},"this":24} 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully pushed: link: bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4 from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM content: {"that":false} Found bafyr4iaab3lel4ykjcyzqajx5np2uluetwvfyv3ujupxt5qs57owhpo6ty on 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true} Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true} Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true} Stored manifest: {"cid":["bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji"],"pool_id":1}
Example (TestMockserver) ¶
package main import ( "bytes" "context" "encoding/base64" "encoding/json" "fmt" "io" "net/http" "strings" "time" logging "github.com/ipfs/go-log/v2" ) var log = logging.Logger("fula/mockserver") // requestLoggerMiddleware logs the details of each request func requestLoggerMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) log.Debugw("Received request", "url", r.URL.Path, "method", r.Method, "body", string(body)) if r.URL.Path == "/fula/pool/vote" { fmt.Printf("Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q %s", string(body)) } else if r.URL.Path == "/fula/manifest/batch_storage" { fmt.Printf("Stored manifest: %s", string(body)) } r.Body = io.NopCloser(bytes.NewBuffer(body)) next.ServeHTTP(w, r) }) } func startMockServer(addr string) *http.Server { handler := http.NewServeMux() handler.HandleFunc("/fula/pool/join", func(w http.ResponseWriter, r *http.Request) { response := map[string]interface{}{ "pool_id": 1, "account": "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q", } json.NewEncoder(w).Encode(response) }) handler.HandleFunc("/fula/pool/cancel_join", func(w http.ResponseWriter, r *http.Request) { response := map[string]interface{}{ "pool_id": 1, "account": "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q", } json.NewEncoder(w).Encode(response) }) handler.HandleFunc("/fula/pool/poolrequests", func(w http.ResponseWriter, r *http.Request) { response := map[string]interface{}{ "poolrequests": []map[string]interface{}{ { "pool_id": 1, "account": "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q", "voted": []string{}, "positive_votes": 0, "peer_id": "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q", }, }, } json.NewEncoder(w).Encode(response) }) handler.HandleFunc("/fula/pool/all", func(w http.ResponseWriter, r *http.Request) { response := map[string]interface{}{ "pools": []map[string]interface{}{ { "pool_id": 1, "creator": "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY", "pool_name": "PoolTest1", "region": "Ontario", "parent": nil, "participants": []string{ "12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM", "12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX", "12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX", }, }, }, } json.NewEncoder(w).Encode(response) }) handler.HandleFunc("/fula/pool/users", func(w http.ResponseWriter, r *http.Request) { response := map[string]interface{}{ "users": []map[string]interface{}{ { "account": "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q", "pool_id": nil, "request_pool_id": 1, "peer_id": "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q", }, { "account": "12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM", "pool_id": 1, "request_pool_id": nil, "peer_id": "12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM", }, { "account": "12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX", "pool_id": 1, "request_pool_id": nil, "peer_id": "12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX", }, { "account": "12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX", "pool_id": 1, "request_pool_id": nil, "peer_id": "12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX", }, }, } json.NewEncoder(w).Encode(response) }) handler.HandleFunc("/fula/pool/vote", func(w http.ResponseWriter, r *http.Request) { response := map[string]interface{}{ "pool_id": 1, "account": "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q", } json.NewEncoder(w).Encode(response) }) handler.HandleFunc("/fula/manifest/available", func(w http.ResponseWriter, r *http.Request) { response := map[string]interface{}{ "manifests": []map[string]interface{}{ { "pool_id": 1, "manifest_metadata": map[string]interface{}{ "job": map[string]string{ "engine": "IPFS", "uri": "bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji", "work": "Storage", }, }, "replication_available": 2, }, { "pool_id": 1, "manifest_metadata": map[string]interface{}{ "job": map[string]string{ "engine": "IPFS", "uri": "bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4", "work": "Storage", }, }, "replication_available": 1, }, }, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) }) handler.HandleFunc("/fula/pool/leave", func(w http.ResponseWriter, r *http.Request) { response := map[string]interface{}{ "pool_id": 1, "account": "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q", } json.NewEncoder(w).Encode(response) }) handler.HandleFunc("/fula/manifest/batch_storage", func(w http.ResponseWriter, r *http.Request) { var reqBody struct { CIDs []string `json:"cid"` PoolID int `json:"pool_id"` } if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } defer r.Body.Close() response := map[string]interface{}{ "pool_id": reqBody.PoolID, "cid": reqBody.CIDs, } if err := json.NewEncoder(w).Encode(response); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } }) handler.HandleFunc("/cid/", func(w http.ResponseWriter, r *http.Request) { cid := strings.TrimPrefix(r.URL.Path, "/cid/") var contextID string switch cid { case "bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4": contextID = base64.StdEncoding.EncodeToString([]byte("12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX")) case "bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji": contextID = base64.StdEncoding.EncodeToString([]byte("12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM")) default: http.Error(w, "Not Found", http.StatusNotFound) return } response := map[string]interface{}{ "MultihashResults": []map[string]interface{}{ { "Multihash": "HiCJpK9N9aiHbWJ40eq3r0Lns3qhnLSUviVYdcBJD4jWjQ==", "ProviderResults": []map[string]interface{}{ { "ContextID": contextID, "Metadata": "gcA/", "Provider": map[string]interface{}{ "ID": "12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R", "Addrs": []string{"/dns/hub.dev.fx.land/tcp/40004/p2p/12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R"}, }, }, }, }, }, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) }) loggedHandler := requestLoggerMiddleware(handler) server := &http.Server{ Addr: addr, Handler: loggedHandler, } go func() { if err := server.ListenAndServe(); err != http.ErrServerClosed { panic(err) } }() time.Sleep(time.Millisecond * 100) return server } func main() { server := startMockServer("127.0.0.1:4000") defer func() { // Shutdown the server after test if err := server.Shutdown(context.Background()); err != nil { panic(err) // Handle the error as you see fit } }() // Define the URL url := "http://127.0.0.1:4000/cid/bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4" // Send a GET request to the server resp, err := http.Get(url) if err != nil { log.Fatal("Error making GET request:", err) } defer resp.Body.Close() // Read the response body body, err := io.ReadAll(resp.Body) if err != nil { log.Fatal("Error reading response body:", err) } // Convert the body to a string and log it fmt.Println("Response:", string(body)) }
Output: Response: {"MultihashResults":[{"Multihash":"HiCJpK9N9aiHbWJ40eq3r0Lns3qhnLSUviVYdcBJD4jWjQ==","ProviderResults":[{"ContextID":"MTJEM0tvb1dIOXN3amVDeXVSNnV0ektVMVVzcGlXNVJER3pBRnZORHdxa1Q1YlVId3V4WA==","Metadata":"gcA/","Provider":{"Addrs":["/dns/hub.dev.fx.land/tcp/40004/p2p/12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R"],"ID":"12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R"}}]}]}
Index ¶
- func FindCIDFromDigest(base32MultibaseDigest string) (cid.Cid, error)
- func IpfsClusterPins(ctx context.Context, lastChecked time.Time, account string) (<-chan datamodel.Link, <-chan error)
- type Blox
- func (p *Blox) AnnounceJoinPoolRequestPeriodically(ctx context.Context)
- func (p *Blox) BloxFreeSpace(ctx context.Context, to peer.ID) ([]byte, error)
- func (p *Blox) EraseBlData(ctx context.Context, to peer.ID) ([]byte, error)
- func (p *Blox) FetchAvailableManifestsAndStore(ctx context.Context, maxCids int) error
- func (p *Blox) FindLinkProvidersByDht(l ipld.Link) ([]peer.AddrInfo, error)
- func (p *Blox) GetBlMembers() map[peer.ID]common.MemberStatus
- func (p *Blox) GetCidv1FromBlockFilename(filename string) (cid.Cid, error)
- func (p *Blox) GetIPFSRPC() *rpc.HttpApi
- func (p *Blox) GetLastCheckedTime() (time.Time, error)
- func (p *Blox) Has(ctx context.Context, l ipld.Link) (bool, error)
- func (p *Blox) ListModifiedStoredBlocks(lastChecked time.Time) ([]datamodel.Link, error)
- func (p *Blox) ListModifiedStoredLinks(ctx context.Context, lastChecked time.Time, account string) ([]datamodel.Link, error)
- func (p *Blox) Load(ctx context.Context, l ipld.Link, np ipld.NodePrototype) (ipld.Node, error)
- func (p *Blox) Ping(ctx context.Context, to peer.ID) (int, int, error)
- func (p *Blox) PingDht(to peer.ID) error
- func (p *Blox) ProvideLinkByDht(l ipld.Link) error
- func (p *Blox) PubsubValidator(ctx context.Context, id peer.ID, msg *pubsub.Message) bool
- func (p *Blox) Pull(ctx context.Context, from peer.ID, l ipld.Link) error
- func (p *Blox) Push(ctx context.Context, from peer.ID, l ipld.Link) error
- func (p *Blox) ServeIpfsRpc() http.Handler
- func (p *Blox) SetAuth(ctx context.Context, on peer.ID, subject peer.ID, allow bool) error
- func (p *Blox) Shutdown(ctx context.Context) error
- func (p *Blox) Start(ctx context.Context) error
- func (p *Blox) StartAnnouncementServer(ctx context.Context) error
- func (p *Blox) StartPingServer(ctx context.Context) error
- func (p *Blox) Store(ctx context.Context, n ipld.Node) (ipld.Link, error)
- func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error
- func (p *Blox) StoreManifest(ctx context.Context, links []blockchain.LinkWithLimit, maxCids int) error
- func (p *Blox) UpdateDhtPeers(peers []peer.ID) error
- func (p *Blox) UpdateFailedCids(links []datamodel.Link) error
- func (p *Blox) UpdateLastCheckedTime() error
- type CidStruct
- type FilesStat
- type Option
- func WithAnnounceInterval(i time.Duration) Option
- func WithBlockchainEndPoint(b string) Option
- func WithDatastore(ds datastore.Batching) Option
- func WithDefaultIPFShttpServer(n string) Option
- func WithExchangeOpts(eo ...exchange.Option) Option
- func WithGetPoolName(getPoolName PoolNameGetter) Option
- func WithHost(h host.Host) Option
- func WithIpfsClient(n *rpc.HttpApi) Option
- func WithIpfsClusterAPI(n ipfsCluster.Client) Option
- func WithLinkSystem(ls *ipld.LinkSystem) Option
- func WithMaxPingTime(pt int) Option
- func WithMinSuccessPingRate(sr int) Option
- func WithPingCount(pc int) Option
- func WithPoolHostMode(n bool) Option
- func WithPoolName(n string) Option
- func WithRelays(r []string) Option
- func WithSecretsPath(b string) Option
- func WithStoreDir(n string) Option
- func WithTopicName(n string) Option
- func WithUpdatePoolName(updatePoolName PoolNameUpdater) Option
- func WithWg(wg *sync.WaitGroup) Option
- type PeerStats
- type PoolNameGetter
- type PoolNameUpdater
- type RepoInfo
- type SizeStat
- type StatsBitswap
- type StatsBw
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FindCIDFromDigest ¶ added in v1.54.8
Types ¶
type Blox ¶
type Blox struct {
// contains filtered or unexported fields
}
func (*Blox) AnnounceJoinPoolRequestPeriodically ¶ added in v1.15.0
func (*Blox) BloxFreeSpace ¶ added in v1.29.0
func (*Blox) EraseBlData ¶ added in v1.30.0
func (*Blox) FetchAvailableManifestsAndStore ¶ added in v1.16.0
FetchAvailableManifestsAndStore fetches available manifests and stores them.
func (*Blox) FindLinkProvidersByDht ¶ added in v1.16.0
func (*Blox) GetBlMembers ¶ added in v1.15.0
func (p *Blox) GetBlMembers() map[peer.ID]common.MemberStatus
func (*Blox) GetCidv1FromBlockFilename ¶ added in v1.53.0
GetCidv1FromBlockFilename extracts CIDv1 from block filename
func (*Blox) GetIPFSRPC ¶ added in v1.53.0
func (*Blox) GetLastCheckedTime ¶ added in v1.53.0
func (*Blox) ListModifiedStoredBlocks ¶ added in v1.53.0
ListModifiedStoredBlocks lists only the folders that have been modified after the last check time and returns the filenames of the files created after the last check time in those folders.
func (*Blox) ListModifiedStoredLinks ¶ added in v1.54.8
func (p *Blox) ListModifiedStoredLinks(ctx context.Context, lastChecked time.Time, account string) ([]datamodel.Link, error)
This method fetches the pinned items in ipfs-cluster since the lastChecked time
func (*Blox) ProvideLinkByDht ¶ added in v1.16.0
func (*Blox) PubsubValidator ¶ added in v1.15.0
func (*Blox) ServeIpfsRpc ¶ added in v1.14.2
func (*Blox) StartAnnouncementServer ¶ added in v1.15.0
func (*Blox) StartPingServer ¶ added in v1.15.0
func (*Blox) StoreManifest ¶ added in v1.16.0
func (p *Blox) StoreManifest(ctx context.Context, links []blockchain.LinkWithLimit, maxCids int) error
func (*Blox) UpdateFailedCids ¶ added in v1.54.8
UpdateFailedCids updates the last checked time by appending failed CIDs to a file. It collects all errors encountered and returns them together.
func (*Blox) UpdateLastCheckedTime ¶ added in v1.53.0
UpdateLastCheckedTime updates the last checked time
type FilesStat ¶ added in v1.14.2
type FilesStat struct { Blocks int `json:"Blocks"` CumulativeSize uint64 `json:"CumulativeSize"` Hash string `json:"Hash"` Local bool `json:"Local,omitempty"` // Optional field. 'omitempty' keyword is used to exclude the field from the output if it's default/zero value Size uint64 `json:"Size"` SizeLocal uint64 `json:"SizeLocal,omitempty"` // Optional field. Type string `json:"Type"` WithLocality bool `json:"WithLocality,omitempty"` // Optional field. }
type Option ¶
type Option func(*options) error
func WithAnnounceInterval ¶
WithAnnounceInterval sets the interval at which announcements are made on the pubsub. Defaults to 5 seconds if unset.
func WithBlockchainEndPoint ¶ added in v1.15.0
func WithDatastore ¶
func WithDatastore(ds datastore.Batching) Option
func WithDefaultIPFShttpServer ¶ added in v1.46.0
func WithExchangeOpts ¶ added in v1.0.0
func WithGetPoolName ¶ added in v1.54.8
func WithGetPoolName(getPoolName PoolNameGetter) Option
func WithHost ¶
WithHost sets the libp2p host on which the blox is exposed. If unset a default host with random identity is used. See: libp2p.New.
func WithIpfsClient ¶ added in v1.48.0
func WithIpfsClusterAPI ¶ added in v1.53.0
func WithIpfsClusterAPI(n ipfsCluster.Client) Option
func WithLinkSystem ¶
func WithLinkSystem(ls *ipld.LinkSystem) Option
func WithMaxPingTime ¶ added in v1.15.0
func WithMinSuccessPingRate ¶ added in v1.15.0
func WithPingCount ¶ added in v1.15.0
func WithPoolHostMode ¶ added in v1.53.0
func WithPoolName ¶
WithPoolName sets a human readable name for the pool that the blox should join or create. Required.
func WithRelays ¶ added in v1.15.0
WithStoreDir sets a the store directory we are using for datastore Required.
func WithSecretsPath ¶ added in v1.41.0
func WithStoreDir ¶ added in v1.14.2
WithStoreDir sets a the store directory we are using for datastore Required.
func WithTopicName ¶
WithTopicName sets the name of the topic onto which announcements are made. Defaults to "/explore.fula/pools/<pool-name>" if unset. See: WithPoolName.
func WithUpdatePoolName ¶ added in v1.15.0
func WithUpdatePoolName(updatePoolName PoolNameUpdater) Option
type PoolNameGetter ¶ added in v1.54.8
type PoolNameGetter func() string
type PoolNameUpdater ¶ added in v1.15.0
type StatsBitswap ¶ added in v1.14.2
type StatsBitswap struct { BlocksReceived uint64 `json:"BlocksReceived"` BlocksSent uint64 `json:"BlocksSent"` DataReceived uint64 `json:"DataReceived"` DataSent uint64 `json:"DataSent"` DupBlksReceived uint64 `json:"DupBlksReceived"` DupDataReceived uint64 `json:"DupDataReceived"` MessagesReceived uint64 `json:"MessagesReceived"` Peers []string `json:"Peers"` ProvideBufLen int `json:"ProvideBufLen"` Wantlist []CidStruct `json:"Wantlist"` }