blox

package
v1.54.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: MIT Imports: 46 Imported by: 0

Documentation

Overview

Example (Blserver)
server := startMockServer("127.0.0.1:4000")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
//ctx = network.WithUseTransient(ctx, "fx.exchange")
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "info"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.
cfg := NewConfig()

// ListenAddr configure
ListenAddrsConfig1 := []string{"/ip4/0.0.0.0/tcp/40001", "/ip4/0.0.0.0/udp/40001/quic", "/ip4/0.0.0.0/udp/40001/quic-v1", "/ip4/0.0.0.0/udp/40001/quic-v1/webtransport"}
listenAddrs1 := make([]multiaddr.Multiaddr, 0, len(ListenAddrsConfig1)+1)
// Convert string addresses to multiaddr and append to listenAddrs
for _, addrString := range ListenAddrsConfig1 {
	addr, err := multiaddr.NewMultiaddr(addrString)
	if err != nil {
		panic(fmt.Errorf("invalid multiaddress: %w", err))
	}
	listenAddrs1 = append(listenAddrs1, addr)
}
// Add the relay multiaddress
relayAddr2, err := multiaddr.NewMultiaddr("/p2p-circuit")
if err != nil {
	panic(fmt.Errorf("error creating relay multiaddress: %w", err))
}
listenAddrs1 = append(listenAddrs1, relayAddr2)
///
ListenAddrsConfig2 := []string{"/ip4/0.0.0.0/tcp/40002", "/ip4/0.0.0.0/udp/40002/quic"}
listenAddrs2 := make([]multiaddr.Multiaddr, 0, len(ListenAddrsConfig2)+1)
// Convert string addresses to multiaddr and append to listenAddrs
for _, addrString := range ListenAddrsConfig2 {
	addr, err := multiaddr.NewMultiaddr(addrString)
	if err != nil {
		panic(fmt.Errorf("invalid multiaddress: %w", err))
	}
	listenAddrs2 = append(listenAddrs2, addr)
}
// Add the relay multiaddress
listenAddrs2 = append(listenAddrs2, relayAddr2)
///
ListenAddrsConfig3 := []string{"/ip4/0.0.0.0/tcp/40003", "/ip4/0.0.0.0/udp/40003/quic"}
listenAddrs3 := make([]multiaddr.Multiaddr, 0, len(ListenAddrsConfig3)+1)
// Convert string addresses to multiaddr and append to listenAddrs
for _, addrString := range ListenAddrsConfig3 {
	addr, err := multiaddr.NewMultiaddr(addrString)
	if err != nil {
		panic(fmt.Errorf("invalid multiaddress: %w", err))
	}
	listenAddrs3 = append(listenAddrs3, addr)
}
// Add the relay multiaddress
listenAddrs3 = append(listenAddrs3, relayAddr2)
///
ListenAddrsConfig4 := []string{"/ip4/0.0.0.0/tcp/40004", "/ip4/0.0.0.0/udp/40004/quic"}
listenAddrs4 := make([]multiaddr.Multiaddr, 0, len(ListenAddrsConfig4)+1)
// Convert string addresses to multiaddr and append to listenAddrs
for _, addrString := range ListenAddrsConfig4 {
	addr, err := multiaddr.NewMultiaddr(addrString)
	if err != nil {
		panic(fmt.Errorf("invalid multiaddress: %w", err))
	}
	listenAddrs4 = append(listenAddrs4, addr)
}
// Add the relay multiaddress
listenAddrs4 = append(listenAddrs4, relayAddr2)

//End of ListenAddr configure
hopts := []libp2p.Option{
	//libp2p.ListenAddrs(listenAddrs...),
	libp2p.EnableNATService(),
	libp2p.NATPortMap(),
	libp2p.EnableRelay(),
	libp2p.EnableHolePunching(),
}
sr := make([]peer.AddrInfo, 0, len(cfg.StaticRelays))
for _, relay := range cfg.StaticRelays {
	rma, err := multiaddr.NewMultiaddr(relay)
	if err != nil {
		fmt.Println(err)
	}
	rai, err := peer.AddrInfoFromP2pAddr(rma)
	if err != nil {
		fmt.Println(err)
	}
	sr = append(sr, *rai)
}
libp2p.EnableAutoRelayWithStaticRelays(sr, autorelay.WithNumRelays(1))
hopts = append(hopts, libp2p.ForceReachabilityPrivate())

// Instantiate the first node in the pool
hopts1 := append(hopts, libp2p.Identity(generateIdentity(1)))
hopts1 = append(hopts1, libp2p.ListenAddrs(listenAddrs1...))
h1, err := libp2p.New(hopts1...)
if err != nil {
	panic(err)
}

n1, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h1),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{devRelay}),
	blox.WithPingCount(5),
	blox.WithMaxPingTime(100),
	blox.WithMinSuccessPingRate(80),
)
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
log.Debugf("n1 Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

// Instantiate the second node in the pool
hopts2 := append(hopts, libp2p.Identity(generateIdentity(2)))
hopts2 = append(hopts2, libp2p.ListenAddrs(listenAddrs2...))
h2, err := libp2p.New(hopts2...)
if err != nil {
	panic(err)
}

n2, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h2),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{devRelay}),
	blox.WithPingCount(5),
	blox.WithMaxPingTime(100),
	blox.WithMinSuccessPingRate(80),
)
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
log.Debugf("n2 Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Instantiate the third node in the pool
hopts3 := append(hopts, libp2p.Identity(generateIdentity(3)))
hopts3 = append(hopts3, libp2p.ListenAddrs(listenAddrs3...))
h3, err := libp2p.New(hopts3...)
if err != nil {
	panic(err)
}

n3, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h3),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{devRelay}),
	blox.WithPingCount(5),
	blox.WithMaxPingTime(100),
	blox.WithMinSuccessPingRate(80),
)
if err != nil {
	panic(err)
}
if err := n3.Start(ctx); err != nil {
	panic(err)
}
defer n3.Shutdown(ctx)
log.Debugf("n3 Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String())

if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil {
	panic(err)
}

// Wait until the nodes discover each other
for {
	if len(h1.Peerstore().Peers()) >= 3 &&
		len(h2.Peerstore().Peers()) >= 3 &&
		len(h3.Peerstore().Peers()) >= 3 {
		break
	} else {
		h1Peers := h1.Peerstore().Peers()
		log.Debugf("n1 Only %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers))
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

h1Peers := h1.Peerstore().Peers()
log.Debugf("n1 Finally %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers))
for _, id := range h1Peers {
	log.Debugf("- %s\n", id)
}

h2Peers := h2.Peerstore().Peers()
log.Debugf("n2 Finally %s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers))
for _, id := range h2Peers {
	log.Debugf("- %s\n", id)
}

h3Peers := h3.Peerstore().Peers()
log.Debugf("n3 Finally %s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers))
for _, id := range h3Peers {
	log.Debugf("- %s\n", id)
}

// Instantiate the fourth node not in the pool
log.Debug("Now creating pid of n4")
log.Debug("Now creating host of n4")
hopts4 := append(hopts, libp2p.Identity(generateIdentity(4)))
hopts4 = append(hopts4, libp2p.ListenAddrs(listenAddrs4...))
h4, err := libp2p.New(hopts4...)
if err != nil {
	log.Errorw("An error happened in creating libp2p  instance of n4", "Err", err)
	panic(err)
}
log.Debug("Now creating blox for n4")
n4, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h4),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{devRelay}),
	blox.WithPingCount(5),
	blox.WithMaxPingTime(100),
	blox.WithMinSuccessPingRate(80),
)
if err != nil {
	log.Errorw("An error happened in creating blox instance of n4", "Err", err)
	panic(err)
}
log.Debug("Now starting n4")
if err := n4.Start(ctx); err != nil {
	log.Errorw("An error happened in starting instance of n4", "Err", err)
	panic(err)
}
defer n4.Shutdown(ctx)
log.Debugf("n4 Instantiated node in pool %s with ID: %s\n", poolName, h4.ID().String())

n4.AnnounceJoinPoolRequestPeriodically(ctx)

if err = h1.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}
if err = h2.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}
if err = h3.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}

// Wait until the fourth node discover others
for {
	members := n4.GetBlMembers()

	for id, status := range members {
		memberInfo := fmt.Sprintf("Member ID: %s, Status: %v", id.String(), status)
		log.Debugln(memberInfo)
	}
	if len(members) >= 2 {
		break
	} else {
		log.Debugln(members)
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(2 * time.Second)
	}
}

h4Peers := h4.Peerstore().Peers()
log.Debugf("n4 Finally %s peerstore contains %d nodes:\n", h4.ID(), len(h4Peers))
for _, id := range h4Peers {
	log.Debugf("- %s\n", id)
}

//wait for 60 seconds
count := 1
for {
	count = count + 1
	if count > 60 {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(1 * time.Second)
	}
}
Output:

Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true}
Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true}
Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true}
Example (Encode64Test)
package main

import (
	"encoding/base64"
	"fmt"
)

func main() {
	originalString := "12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX"

	// Encode to Base64
	encodedString := base64.StdEncoding.EncodeToString([]byte(originalString))
	fmt.Println("Encoded:", encodedString)

	// Decode from Base64
	decodedBytes, err := base64.StdEncoding.DecodeString(encodedString)
	if err != nil {
		fmt.Println("Decode error:", err)
		return
	}
	decodedString := string(decodedBytes)
	fmt.Println("Decoded:", decodedString)

	// Check if original and decoded are the same
	if originalString == decodedString {
		fmt.Println("Success: Original and decoded strings are the same.")
	} else {
		fmt.Println("Error: Original and decoded strings are different.")
	}

}
Output:

Encoded: MTJEM0tvb1dIOXN3amVDeXVSNnV0ektVMVVzcGlXNVJER3pBRnZORHdxa1Q1YlVId3V4WA==
Decoded: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
Success: Original and decoded strings are the same.
Example (Pingtest)
averageDuration := float64(2000)
successCount := 0
server := startMockServer("127.0.0.1:4000")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "debug"); err != nil {
	panic(err)
}

nodeMultiAddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5001")
if err != nil {
	panic(fmt.Errorf("invalid multiaddress: %w", err))
}
node, err := rpc.NewApi(nodeMultiAddr)

// Use a deterministic random generator to generate deterministic
// output for the example.
h1, err := libp2p.New(libp2p.Identity(generateIdentity(1)))
if err != nil {
	panic(err)
}
n1, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h1),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithPingCount(5),
	blox.WithIpfsClient(node),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())
PingCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

// Send the ping request
n1.GetBlMembers()
rpc := n1.GetIPFSRPC()
res, err := rpc.Request("ping", "12D3KooWHb38UxY8akVGWZBuFtS3NJ7rJUwd36t3cfkoY7EbgNt9").Send(PingCtx)
if err != nil {
	fmt.Printf("ping was unsuccessful: %s", err)
	return
}
if res.Error != nil {
	fmt.Printf("ping was unsuccessful: %s", res.Error)
	return
}

// Process multiple responses using a decoder
decoder := json.NewDecoder(res.Output)

var totalTime int64
var count int

for decoder.More() {
	var pingResp PingResponse
	err := decoder.Decode(&pingResp)
	if err != nil {
		log.Errorf("error decoding JSON response: %s", err)
		continue
	}

	if pingResp.Text == "" && pingResp.Time > 0 { // Check for empty Text field and Time
		totalTime += pingResp.Time
		count++
	}
}

if count > 0 {
	averageDuration = float64(totalTime) / float64(count) / 1e6 // Convert nanoseconds to milliseconds
	successCount = count
} else {
	fmt.Println("No valid ping responses received")
	return
}
if int(averageDuration) > 1 {
	fmt.Printf("Final response: successCount=%d", successCount)
}
Output:

Final response: successCount=10
Example (PoolDiscoverPeersViaPubSub)

Example_poolDiscoverPeersViaPubSub starts a pool named "1" across three nodes, connects two of the nodes to the other one to facilitate a path for pubsub to propagate and shows all three nodes discover each other using pubsub.

server := startMockServer("127.0.0.1:4000")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "info"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.
identity1 := libp2p.Identity(generateIdentity(1))

h1, err := libp2p.New(identity1)
if err != nil {
	panic(err)
}
log.Infow("h1 value generated", "h1", h1.ID()) //12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1))
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

identity2 := libp2p.Identity(generateIdentity(2))

h2, err := libp2p.New(identity2)
if err != nil {
	panic(err)
}
log.Infow("h2 value generated", "h2", h2.ID()) //12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
n2, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h2))
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

identity3 := libp2p.Identity(generateIdentity(3))

h3, err := libp2p.New(identity3)
if err != nil {
	panic(err)
}
log.Infow("h3 value generated", "h3", h3.ID()) //12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
if err != nil {
	panic(err)
}
n3, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h3))
if err != nil {
	panic(err)
}
if err := n3.Start(ctx); err != nil {
	panic(err)
}
defer n3.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String())

// Connect n1 to n2 and n3 so that there is a path for gossip propagation.
// Note that we are not connecting n2 to n3 as they should discover
// each other via pool's iexist announcements.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil {
	panic(err)
}

// Wait until the nodes discover each other
for {
	if len(h1.Peerstore().Peers()) == 4 &&
		len(h2.Peerstore().Peers()) == 4 &&
		len(h3.Peerstore().Peers()) == 4 {
		break
	} else {
		log.Infow("h1.Peerstore().Peers() is waitting", "h1.Peerstore().Peers()", h1.Peerstore().Peers())
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

h1Peers := h1.Peerstore().Peers()
fmt.Printf("%s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers))
for _, id := range h1Peers {
	fmt.Printf("- %s\n", id)
}

h2Peers := h2.Peerstore().Peers()
fmt.Printf("%s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers))
for _, id := range h2Peers {
	fmt.Printf("- %s\n", id)
}

h3Peers := h3.Peerstore().Peers()
fmt.Printf("%s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers))
for _, id := range h2Peers {
	fmt.Printf("- %s\n", id)
}
Output:

Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
Instantiated node in pool 1 with ID: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
Instantiated node in pool 1 with ID: 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM peerstore contains 4 nodes:
- 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
- 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
- 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
- 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX peerstore contains 4 nodes:
- 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
- 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
- 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
- 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R
12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX peerstore contains 4 nodes:
- 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
- 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
- 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
- 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R
Example (StoreManifest)
server := startMockServer("127.0.0.1:4000")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "debug"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.
h1, err := libp2p.New(libp2p.Identity(generateIdentity(1)))
if err != nil {
	panic(err)
}
n1, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h1),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithPingCount(5),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

h2, err := libp2p.New(libp2p.Identity(generateIdentity(2)))
if err != nil {
	panic(err)
}
n2, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h2),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithPingCount(5),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Instantiate the third node in the pool
h3, err := libp2p.New(libp2p.Identity(generateIdentity(3)))
if err != nil {
	panic(err)
}
n3, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h3),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithPingCount(5),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
		exchange.WithIpniGetEndPoint("http://127.0.0.1:4000/cid/"),
	),
)
if err != nil {
	panic(err)
}
if err := n3.Start(ctx); err != nil {
	panic(err)
}
defer n3.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String())

// Instantiate the third node in the pool
h4, err := libp2p.New(libp2p.Identity(generateIdentity(4)))
if err != nil {
	panic(err)
}
n4, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h4),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithPingCount(5),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
		exchange.WithIpniGetEndPoint("http://127.0.0.1:4000/cid/"),
	),
)
if err != nil {
	panic(err)
}
if err := n4.Start(ctx); err != nil {
	panic(err)
}
defer n4.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h4.ID().String())

// Connect n1 to n2.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)
if err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil {
	panic(err)
}
h3.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)
if err = h3.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil {
	panic(err)
}
h4.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)
if err = h4.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil {
	panic(err)
}

for {
	if len(h1.Peerstore().Peers()) >= 3 &&
		len(h2.Peerstore().Peers()) >= 3 &&
		len(h3.Peerstore().Peers()) >= 3 &&
		len(h4.Peerstore().Peers()) >= 3 {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

// Authorize exchange between the two nodes
if err := n1.SetAuth(ctx, h1.ID(), h2.ID(), true); err != nil {
	panic(err)
}
if err := n2.SetAuth(ctx, h2.ID(), h1.ID(), true); err != nil {
	panic(err)
}

// Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1
n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignBool(true)
})
n1leafLink, err := n1.Store(ctx, n1leaf)
if err != nil {
	panic(err)
}
n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignInt(42)
	na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink)
})
n1RootLink, err := n1.Store(ctx, n1Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

// Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1
n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignBool(false)
})
n2leafLink, err := n2.Store(ctx, n2leaf)
if err != nil {
	panic(err)
}
n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignInt(24)
	na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink)
})
n2RootLink, err := n2.Store(ctx, n2Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

fmt.Println("exchanging by Pull...")
// Pull the sample DAG stored on node 1 from node 2 by only asking for the root link.
// Because fetch implementation is recursive, it should fetch the leaf link too.
if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil {
	panic(err)
}

// Assert that n2 now has both root and leaf links
if exists, err := n2.Has(ctx, n1RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n2.Has(ctx, n1leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}

fmt.Println("exchanging by Push...")
// Push the sample DAG stored on node 2 to node 1 by only pushing the root link.
// Because Push implementation is recursive, it should push the leaf link too.
if err := n2.Push(ctx, h1.ID(), n2RootLink); err != nil {
	panic(err)
}

// Since push is an asynchronous operation, wait until background push is finished
// by periodically checking if link is present on node 1.
for {
	if exists, _ := n1.Has(ctx, n2RootLink); exists {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

// Assert that n1 now has both root and leaf links
if exists, err := n1.Has(ctx, n2RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have pushed the entire sample DAG")
} else {
	fmt.Printf("%s successfully pushed:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n1.Load(ctx, n2RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n1.Has(ctx, n2leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have pushed the entire sample DAG")
} else {
	fmt.Printf("%s successfully pushed:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n1.Load(ctx, n2leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
err = n1.ProvideLinkByDht(n2leafLink)
if err != nil {
	fmt.Print("Error happened in ProvideLinkByDht")
	panic(err)
}
peerlist3, err := n3.FindLinkProvidersByDht(n2leafLink)
if err != nil {
	fmt.Print("Error happened in FindLinkProvidersByDht3")
	panic(err)
}

// Iterate over the slice and print the peer ID of each AddrInfo
for _, addrInfo := range peerlist3 {
	fmt.Printf("Found %s on %s\n", n2leafLink, addrInfo.ID.String()) // ID.String() converts the peer ID to a string
}

n3.FetchAvailableManifestsAndStore(ctx, 2)
time.Sleep(1 * time.Second)
n4.FetchAvailableManifestsAndStore(ctx, 2)
Output:

Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
Instantiated node in pool 1 with ID: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
Instantiated node in pool 1 with ID: 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
Instantiated node in pool 1 with ID: 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q
12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM stored IPLD data with links:
    root: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji
    leaf:bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4
12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM stored IPLD data with links:
    root: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji
    leaf:bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4
exchanging by Pull...
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully fetched:
    link: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"oneLeafLink":{"/":"bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4"},"that":42}
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully fetched:
    link: bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"this":true}
exchanging by Push...
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully pushed:
    link: bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"anotherLeafLink":{"/":"bafyr4iaab3lel4ykjcyzqajx5np2uluetwvfyv3ujupxt5qs57owhpo6ty"},"this":24}
12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX successfully pushed:
    link: bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4
    from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
    content: {"that":false}
Found bafyr4iaab3lel4ykjcyzqajx5np2uluetwvfyv3ujupxt5qs57owhpo6ty on 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true}
Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true}
Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true}
Stored manifest: {"cid":["bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji"],"pool_id":1}
Example (TestMockserver)
package main

import (
	"bytes"
	"context"
	"encoding/base64"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"strings"
	"time"

	logging "github.com/ipfs/go-log/v2"
)

var log = logging.Logger("fula/mockserver")

// requestLoggerMiddleware logs the details of each request
func requestLoggerMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

		body, _ := io.ReadAll(r.Body)
		log.Debugw("Received request", "url", r.URL.Path, "method", r.Method, "body", string(body))
		if r.URL.Path == "/fula/pool/vote" {
			fmt.Printf("Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q %s", string(body))
		} else if r.URL.Path == "/fula/manifest/batch_storage" {
			fmt.Printf("Stored manifest: %s", string(body))
		}

		r.Body = io.NopCloser(bytes.NewBuffer(body))

		next.ServeHTTP(w, r)
	})
}
func startMockServer(addr string) *http.Server {
	handler := http.NewServeMux()

	handler.HandleFunc("/fula/pool/join", func(w http.ResponseWriter, r *http.Request) {
		response := map[string]interface{}{
			"pool_id": 1,
			"account": "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q",
		}
		json.NewEncoder(w).Encode(response)
	})

	handler.HandleFunc("/fula/pool/cancel_join", func(w http.ResponseWriter, r *http.Request) {
		response := map[string]interface{}{
			"pool_id": 1,
			"account": "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q",
		}
		json.NewEncoder(w).Encode(response)
	})

	handler.HandleFunc("/fula/pool/poolrequests", func(w http.ResponseWriter, r *http.Request) {
		response := map[string]interface{}{
			"poolrequests": []map[string]interface{}{
				{
					"pool_id":        1,
					"account":        "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q",
					"voted":          []string{},
					"positive_votes": 0,
					"peer_id":        "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q",
				},
			},
		}
		json.NewEncoder(w).Encode(response)
	})

	handler.HandleFunc("/fula/pool/all", func(w http.ResponseWriter, r *http.Request) {
		response := map[string]interface{}{
			"pools": []map[string]interface{}{
				{
					"pool_id":   1,
					"creator":   "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY",
					"pool_name": "PoolTest1",
					"region":    "Ontario",
					"parent":    nil,
					"participants": []string{
						"12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM",
						"12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX",
						"12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX",
					},
				},
			},
		}
		json.NewEncoder(w).Encode(response)
	})

	handler.HandleFunc("/fula/pool/users", func(w http.ResponseWriter, r *http.Request) {
		response := map[string]interface{}{
			"users": []map[string]interface{}{
				{
					"account":         "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q",
					"pool_id":         nil,
					"request_pool_id": 1,
					"peer_id":         "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q",
				},
				{
					"account":         "12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM",
					"pool_id":         1,
					"request_pool_id": nil,
					"peer_id":         "12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM",
				},
				{
					"account":         "12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX",
					"pool_id":         1,
					"request_pool_id": nil,
					"peer_id":         "12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX",
				},
				{
					"account":         "12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX",
					"pool_id":         1,
					"request_pool_id": nil,
					"peer_id":         "12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX",
				},
			},
		}
		json.NewEncoder(w).Encode(response)
	})

	handler.HandleFunc("/fula/pool/vote", func(w http.ResponseWriter, r *http.Request) {
		response := map[string]interface{}{
			"pool_id": 1,
			"account": "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q",
		}
		json.NewEncoder(w).Encode(response)
	})

	handler.HandleFunc("/fula/manifest/available", func(w http.ResponseWriter, r *http.Request) {
		response := map[string]interface{}{
			"manifests": []map[string]interface{}{
				{
					"pool_id": 1,
					"manifest_metadata": map[string]interface{}{
						"job": map[string]string{
							"engine": "IPFS",
							"uri":    "bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji",
							"work":   "Storage",
						},
					},
					"replication_available": 2,
				},
				{
					"pool_id": 1,
					"manifest_metadata": map[string]interface{}{
						"job": map[string]string{
							"engine": "IPFS",
							"uri":    "bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4",
							"work":   "Storage",
						},
					},
					"replication_available": 1,
				},
			},
		}

		w.Header().Set("Content-Type", "application/json")
		json.NewEncoder(w).Encode(response)
	})

	handler.HandleFunc("/fula/pool/leave", func(w http.ResponseWriter, r *http.Request) {
		response := map[string]interface{}{
			"pool_id": 1,
			"account": "12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q",
		}
		json.NewEncoder(w).Encode(response)
	})

	handler.HandleFunc("/fula/manifest/batch_storage", func(w http.ResponseWriter, r *http.Request) {
		var reqBody struct {
			CIDs   []string `json:"cid"`
			PoolID int      `json:"pool_id"`
		}

		if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil {
			http.Error(w, err.Error(), http.StatusBadRequest)
			return
		}

		defer r.Body.Close()

		response := map[string]interface{}{
			"pool_id": reqBody.PoolID,
			"cid":     reqBody.CIDs,
		}

		if err := json.NewEncoder(w).Encode(response); err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
	})

	handler.HandleFunc("/cid/", func(w http.ResponseWriter, r *http.Request) {

		cid := strings.TrimPrefix(r.URL.Path, "/cid/")

		var contextID string
		switch cid {
		case "bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4":
			contextID = base64.StdEncoding.EncodeToString([]byte("12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX"))
		case "bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji":
			contextID = base64.StdEncoding.EncodeToString([]byte("12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM"))
		default:
			http.Error(w, "Not Found", http.StatusNotFound)
			return
		}

		response := map[string]interface{}{
			"MultihashResults": []map[string]interface{}{
				{
					"Multihash": "HiCJpK9N9aiHbWJ40eq3r0Lns3qhnLSUviVYdcBJD4jWjQ==",
					"ProviderResults": []map[string]interface{}{
						{
							"ContextID": contextID,
							"Metadata":  "gcA/",
							"Provider": map[string]interface{}{
								"ID":    "12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R",
								"Addrs": []string{"/dns/hub.dev.fx.land/tcp/40004/p2p/12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R"},
							},
						},
					},
				},
			},
		}

		w.Header().Set("Content-Type", "application/json")
		json.NewEncoder(w).Encode(response)
	})

	loggedHandler := requestLoggerMiddleware(handler)

	server := &http.Server{
		Addr:    addr,
		Handler: loggedHandler,
	}

	go func() {
		if err := server.ListenAndServe(); err != http.ErrServerClosed {
			panic(err)
		}
	}()

	time.Sleep(time.Millisecond * 100)

	return server
}

func main() {
	server := startMockServer("127.0.0.1:4000")
	defer func() {
		// Shutdown the server after test
		if err := server.Shutdown(context.Background()); err != nil {
			panic(err) // Handle the error as you see fit
		}
	}()
	// Define the URL
	url := "http://127.0.0.1:4000/cid/bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4"

	// Send a GET request to the server
	resp, err := http.Get(url)
	if err != nil {
		log.Fatal("Error making GET request:", err)
	}
	defer resp.Body.Close()

	// Read the response body
	body, err := io.ReadAll(resp.Body)
	if err != nil {
		log.Fatal("Error reading response body:", err)
	}

	// Convert the body to a string and log it
	fmt.Println("Response:", string(body))

}
Output:

Response: {"MultihashResults":[{"Multihash":"HiCJpK9N9aiHbWJ40eq3r0Lns3qhnLSUviVYdcBJD4jWjQ==","ProviderResults":[{"ContextID":"MTJEM0tvb1dIOXN3amVDeXVSNnV0ektVMVVzcGlXNVJER3pBRnZORHdxa1Q1YlVId3V4WA==","Metadata":"gcA/","Provider":{"Addrs":["/dns/hub.dev.fx.land/tcp/40004/p2p/12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R"],"ID":"12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R"}}]}]}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func FindCIDFromDigest added in v1.54.8

func FindCIDFromDigest(base32MultibaseDigest string) (cid.Cid, error)

func IpfsClusterPins added in v1.54.8

func IpfsClusterPins(ctx context.Context, lastChecked time.Time, account string) (<-chan datamodel.Link, <-chan error)

IpfsClusterPins streams pins from the IPFS Cluster API and sends them through a channel.

Types

type Blox

type Blox struct {
	// contains filtered or unexported fields
}

func New

func New(o ...Option) (*Blox, error)

func (*Blox) AnnounceJoinPoolRequestPeriodically added in v1.15.0

func (p *Blox) AnnounceJoinPoolRequestPeriodically(ctx context.Context)

func (*Blox) BloxFreeSpace added in v1.29.0

func (p *Blox) BloxFreeSpace(ctx context.Context, to peer.ID) ([]byte, error)

func (*Blox) EraseBlData added in v1.30.0

func (p *Blox) EraseBlData(ctx context.Context, to peer.ID) ([]byte, error)

func (*Blox) FetchAvailableManifestsAndStore added in v1.16.0

func (p *Blox) FetchAvailableManifestsAndStore(ctx context.Context, maxCids int) error

FetchAvailableManifestsAndStore fetches available manifests and stores them.

func (*Blox) FindLinkProvidersByDht added in v1.16.0

func (p *Blox) FindLinkProvidersByDht(l ipld.Link) ([]peer.AddrInfo, error)

func (*Blox) GetBlMembers added in v1.15.0

func (p *Blox) GetBlMembers() map[peer.ID]common.MemberStatus

func (*Blox) GetCidv1FromBlockFilename added in v1.53.0

func (p *Blox) GetCidv1FromBlockFilename(filename string) (cid.Cid, error)

GetCidv1FromBlockFilename extracts CIDv1 from block filename

func (*Blox) GetIPFSRPC added in v1.53.0

func (p *Blox) GetIPFSRPC() *rpc.HttpApi

func (*Blox) GetLastCheckedTime added in v1.53.0

func (p *Blox) GetLastCheckedTime() (time.Time, error)

func (*Blox) Has

func (p *Blox) Has(ctx context.Context, l ipld.Link) (bool, error)

func (*Blox) ListModifiedStoredBlocks added in v1.53.0

func (p *Blox) ListModifiedStoredBlocks(lastChecked time.Time) ([]datamodel.Link, error)

ListModifiedStoredBlocks lists only the folders that have been modified after the last check time and returns the filenames of the files created after the last check time in those folders.

func (p *Blox) ListModifiedStoredLinks(ctx context.Context, lastChecked time.Time, account string) ([]datamodel.Link, error)

This method fetches the pinned items in ipfs-cluster since the lastChecked time

func (*Blox) Load

func (p *Blox) Load(ctx context.Context, l ipld.Link, np ipld.NodePrototype) (ipld.Node, error)

func (*Blox) Ping added in v1.15.0

func (p *Blox) Ping(ctx context.Context, to peer.ID) (int, int, error)

func (*Blox) PingDht added in v1.16.0

func (p *Blox) PingDht(to peer.ID) error

func (*Blox) ProvideLinkByDht added in v1.16.0

func (p *Blox) ProvideLinkByDht(l ipld.Link) error

func (*Blox) PubsubValidator added in v1.15.0

func (p *Blox) PubsubValidator(ctx context.Context, id peer.ID, msg *pubsub.Message) bool

func (*Blox) Pull

func (p *Blox) Pull(ctx context.Context, from peer.ID, l ipld.Link) error

func (*Blox) Push

func (p *Blox) Push(ctx context.Context, from peer.ID, l ipld.Link) error

func (*Blox) ServeIpfsRpc added in v1.14.2

func (p *Blox) ServeIpfsRpc() http.Handler

func (*Blox) SetAuth added in v0.8.3

func (p *Blox) SetAuth(ctx context.Context, on peer.ID, subject peer.ID, allow bool) error

func (*Blox) Shutdown

func (p *Blox) Shutdown(ctx context.Context) error

func (*Blox) Start

func (p *Blox) Start(ctx context.Context) error

func (*Blox) StartAnnouncementServer added in v1.15.0

func (p *Blox) StartAnnouncementServer(ctx context.Context) error

func (*Blox) StartPingServer added in v1.15.0

func (p *Blox) StartPingServer(ctx context.Context) error

func (*Blox) Store

func (p *Blox) Store(ctx context.Context, n ipld.Node) (ipld.Link, error)

func (*Blox) StoreCid added in v1.16.0

func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error

func (*Blox) StoreManifest added in v1.16.0

func (p *Blox) StoreManifest(ctx context.Context, links []blockchain.LinkWithLimit, maxCids int) error

func (*Blox) UpdateDhtPeers added in v1.16.0

func (p *Blox) UpdateDhtPeers(peers []peer.ID) error

func (*Blox) UpdateFailedCids added in v1.54.8

func (p *Blox) UpdateFailedCids(links []datamodel.Link) error

UpdateFailedCids updates the last checked time by appending failed CIDs to a file. It collects all errors encountered and returns them together.

func (*Blox) UpdateLastCheckedTime added in v1.53.0

func (p *Blox) UpdateLastCheckedTime() error

UpdateLastCheckedTime updates the last checked time

type CidStruct added in v1.14.2

type CidStruct struct {
	Root string `json:"/"`
}

type FilesStat added in v1.14.2

type FilesStat struct {
	Blocks         int    `json:"Blocks"`
	CumulativeSize uint64 `json:"CumulativeSize"`
	Hash           string `json:"Hash"`
	Local          bool   `json:"Local,omitempty"` // Optional field. 'omitempty' keyword is used to exclude the field from the output if it's default/zero value
	Size           uint64 `json:"Size"`
	SizeLocal      uint64 `json:"SizeLocal,omitempty"` // Optional field.
	Type           string `json:"Type"`
	WithLocality   bool   `json:"WithLocality,omitempty"` // Optional field.
}

type Option

type Option func(*options) error

func WithAnnounceInterval

func WithAnnounceInterval(i time.Duration) Option

WithAnnounceInterval sets the interval at which announcements are made on the pubsub. Defaults to 5 seconds if unset.

func WithBlockchainEndPoint added in v1.15.0

func WithBlockchainEndPoint(b string) Option

func WithDatastore

func WithDatastore(ds datastore.Batching) Option

func WithDefaultIPFShttpServer added in v1.46.0

func WithDefaultIPFShttpServer(n string) Option

func WithExchangeOpts added in v1.0.0

func WithExchangeOpts(eo ...exchange.Option) Option

func WithGetPoolName added in v1.54.8

func WithGetPoolName(getPoolName PoolNameGetter) Option

func WithHost

func WithHost(h host.Host) Option

WithHost sets the libp2p host on which the blox is exposed. If unset a default host with random identity is used. See: libp2p.New.

func WithIpfsClient added in v1.48.0

func WithIpfsClient(n *rpc.HttpApi) Option

func WithIpfsClusterAPI added in v1.53.0

func WithIpfsClusterAPI(n ipfsCluster.Client) Option

func WithLinkSystem

func WithLinkSystem(ls *ipld.LinkSystem) Option

func WithMaxPingTime added in v1.15.0

func WithMaxPingTime(pt int) Option

func WithMinSuccessPingRate added in v1.15.0

func WithMinSuccessPingRate(sr int) Option

func WithPingCount added in v1.15.0

func WithPingCount(pc int) Option

func WithPoolHostMode added in v1.53.0

func WithPoolHostMode(n bool) Option

func WithPoolName

func WithPoolName(n string) Option

WithPoolName sets a human readable name for the pool that the blox should join or create. Required.

func WithRelays added in v1.15.0

func WithRelays(r []string) Option

WithStoreDir sets a the store directory we are using for datastore Required.

func WithSecretsPath added in v1.41.0

func WithSecretsPath(b string) Option

func WithStoreDir added in v1.14.2

func WithStoreDir(n string) Option

WithStoreDir sets a the store directory we are using for datastore Required.

func WithTopicName

func WithTopicName(n string) Option

WithTopicName sets the name of the topic onto which announcements are made. Defaults to "/explore.fula/pools/<pool-name>" if unset. See: WithPoolName.

func WithUpdatePoolName added in v1.15.0

func WithUpdatePoolName(updatePoolName PoolNameUpdater) Option

func WithWg added in v1.21.0

func WithWg(wg *sync.WaitGroup) Option

type PeerStats added in v1.14.2

type PeerStats struct {
	Exchanged uint64  `json:"Exchanged"`
	Peer      string  `json:"Peer"`
	Recv      uint64  `json:"Recv"`
	Sent      uint64  `json:"Sent"`
	Value     float64 `json:"Value"`
}

type PoolNameGetter added in v1.54.8

type PoolNameGetter func() string

type PoolNameUpdater added in v1.15.0

type PoolNameUpdater func(string) error

type RepoInfo added in v1.14.2

type RepoInfo struct {
	NumObjects uint64   `json:"NumObjects"`
	RepoPath   string   `json:"RepoPath"`
	SizeStat   SizeStat `json:"SizeStat"`
	Version    string   `json:"Version"`
	RepoSize   uint64   `json:"RepoSize"`
}

type SizeStat added in v1.14.2

type SizeStat struct {
	RepoSize   uint64 `json:"RepoSize"`
	StorageMax uint64 `json:"StorageMax"`
}

type StatsBitswap added in v1.14.2

type StatsBitswap struct {
	BlocksReceived   uint64      `json:"BlocksReceived"`
	BlocksSent       uint64      `json:"BlocksSent"`
	DataReceived     uint64      `json:"DataReceived"`
	DataSent         uint64      `json:"DataSent"`
	DupBlksReceived  uint64      `json:"DupBlksReceived"`
	DupDataReceived  uint64      `json:"DupDataReceived"`
	MessagesReceived uint64      `json:"MessagesReceived"`
	Peers            []string    `json:"Peers"`
	ProvideBufLen    int         `json:"ProvideBufLen"`
	Wantlist         []CidStruct `json:"Wantlist"`
}

type StatsBw added in v1.14.2

type StatsBw struct {
	RateIn   float64 `json:"RateIn"`
	RateOut  float64 `json:"RateOut"`
	TotalIn  int64   `json:"TotalIn"`
	TotalOut int64   `json:"TotalOut"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL