blox

package
v1.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2023 License: MIT Imports: 31 Imported by: 0

Documentation

Overview

Example (Blserver)
server := startMockServer("127.0.0.1:4000")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
//ctx = network.WithUseTransient(ctx, "fx.exchange")
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "info"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.
rng := rand.New(rand.NewSource(42))
cfg := NewConfig()

hopts := []libp2p.Option{
	libp2p.EnableNATService(),
	libp2p.NATPortMap(),
	libp2p.EnableRelay(),
	libp2p.EnableHolePunching(),
}
sr := make([]peer.AddrInfo, 0, len(cfg.StaticRelays))
for _, relay := range cfg.StaticRelays {
	rma, err := multiaddr.NewMultiaddr(relay)
	if err != nil {
		fmt.Println(err)
	}
	rai, err := peer.AddrInfoFromP2pAddr(rma)
	if err != nil {
		fmt.Println(err)
	}
	sr = append(sr, *rai)
}
libp2p.EnableAutoRelayWithStaticRelays(sr, autorelay.WithNumRelays(1))
hopts = append(hopts, libp2p.ForceReachabilityPrivate())

// Instantiate the first node in the pool
pid1, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
hopts1 := append(hopts, libp2p.Identity(pid1))
h1, err := libp2p.New(hopts1...)
if err != nil {
	panic(err)
}

n1, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h1),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{devRelay}),
	blox.WithPingCount(5),
	blox.WithMaxPingTime(100),
	blox.WithMinSuccessPingRate(80),
)
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
log.Debugf("n1 Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

// Instantiate the second node in the pool
pid2, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
hopts2 := append(hopts, libp2p.Identity(pid2))
h2, err := libp2p.New(hopts2...)
if err != nil {
	panic(err)
}

n2, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h2),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{devRelay}),
	blox.WithPingCount(5),
	blox.WithMaxPingTime(100),
	blox.WithMinSuccessPingRate(80),
)
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
log.Debugf("n2 Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Instantiate the third node in the pool
pid3, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
hopts3 := append(hopts, libp2p.Identity(pid3))
h3, err := libp2p.New(hopts3...)
if err != nil {
	panic(err)
}

n3, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h3),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{devRelay}),
	blox.WithPingCount(5),
	blox.WithMaxPingTime(100),
	blox.WithMinSuccessPingRate(80),
)
if err != nil {
	panic(err)
}
if err := n3.Start(ctx); err != nil {
	panic(err)
}
defer n3.Shutdown(ctx)
log.Debugf("n3 Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String())

if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil {
	panic(err)
}

// Wait until the nodes discover each other
for {
	if len(h1.Peerstore().Peers()) >= 3 &&
		len(h2.Peerstore().Peers()) >= 3 &&
		len(h3.Peerstore().Peers()) >= 3 {
		break
	} else {
		h1Peers := h1.Peerstore().Peers()
		log.Debugf("n1 Only %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers))
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

h1Peers := h1.Peerstore().Peers()
log.Debugf("n1 Finally %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers))
for _, id := range h1Peers {
	log.Debugf("- %s\n", id)
}

h2Peers := h2.Peerstore().Peers()
log.Debugf("n2 Finally %s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers))
for _, id := range h2Peers {
	log.Debugf("- %s\n", id)
}

h3Peers := h3.Peerstore().Peers()
log.Debugf("n3 Finally %s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers))
for _, id := range h3Peers {
	log.Debugf("- %s\n", id)
}

// Instantiate the fourth node not in the pool
log.Debug("Now creating pid of n4")
pid4, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	log.Errorw("An error happened in creating keypair of n4", "Err", err)
	panic(err)
}
log.Debug("Now creating host of n4")
hopts4 := append(hopts, libp2p.Identity(pid4))
h4, err := libp2p.New(hopts4...)
if err != nil {
	log.Errorw("An error happened in creating libp2p  instance of n4", "Err", err)
	panic(err)
}
log.Debug("Now creating blox for n4")
n4, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h4),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{devRelay}),
	blox.WithPingCount(5),
	blox.WithMaxPingTime(100),
	blox.WithMinSuccessPingRate(80),
)
if err != nil {
	log.Errorw("An error happened in creating blox instance of n4", "Err", err)
	panic(err)
}
log.Debug("Now starting n4")
if err := n4.Start(ctx); err != nil {
	log.Errorw("An error happened in starting instance of n4", "Err", err)
	panic(err)
}
defer n4.Shutdown(ctx)
log.Debugf("n4 Instantiated node in pool %s with ID: %s\n", poolName, h4.ID().String())

n4.AnnounceJoinPoolRequestPeriodically(ctx)

if err = h1.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}
if err = h2.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}
if err = h3.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil {
	panic(err)
}

// Wait until the fourth node discover others
for {
	members := n4.GetBlMembers()

	for id, status := range members {
		memberInfo := fmt.Sprintf("Member ID: %s, Status: %v", id.String(), status)
		log.Debugln(memberInfo)
	}
	if len(members) >= 2 {
		break
	} else {
		log.Debugln(members)
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(2 * time.Second)
	}
}

h4Peers := h4.Peerstore().Peers()
log.Debugf("n4 Finally %s peerstore contains %d nodes:\n", h4.ID(), len(h4Peers))
for _, id := range h4Peers {
	log.Debugf("- %s\n", id)
}

//wait for 60 seconds
count := 1
for {
	count = count + 1
	if count > 60 {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(1 * time.Second)
	}
}
Output:

Voted on QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe {"pool_id":1,"account":"QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe","vote_value":true}
Voted on QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe {"pool_id":1,"account":"QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe","vote_value":true}
Voted on QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe {"pool_id":1,"account":"QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe","vote_value":true}
Example (PoolDiscoverPeersViaPubSub)

Example_poolDiscoverPeersViaPubSub starts a pool named "1" across three nodes, connects two of the nodes to the other one to facilitate a path for pubsub to propagate and shows all three nodes discover each other using pubsub.

server := startMockServer("127.0.0.1:4000")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "info"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.
rng := rand.New(rand.NewSource(42))

// Instantiate the first node in the pool
pid1, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h1, err := libp2p.New(libp2p.Identity(pid1))
if err != nil {
	panic(err)
}

n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1))
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

// Instantiate the second node in the pool
pid2, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h2, err := libp2p.New(libp2p.Identity(pid2))
if err != nil {
	panic(err)
}
n2, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h2))
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Instantiate the third node in the pool
pid3, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h3, err := libp2p.New(libp2p.Identity(pid3))
if err != nil {
	panic(err)
}
n3, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h3))
if err != nil {
	panic(err)
}
if err := n3.Start(ctx); err != nil {
	panic(err)
}
defer n3.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String())

// Connect n1 to n2 and n3 so that there is a path for gossip propagation.
// Note that we are not connecting n2 to n3 as they should discover
// each other via pool's iexist announcements.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil {
	panic(err)
}

// Wait until the nodes discover each other
for {
	if len(h1.Peerstore().Peers()) == 3 &&
		len(h2.Peerstore().Peers()) == 3 &&
		len(h3.Peerstore().Peers()) == 3 {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

h1Peers := h1.Peerstore().Peers()
fmt.Printf("%s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers))
for _, id := range h1Peers {
	fmt.Printf("- %s\n", id)
}

h2Peers := h2.Peerstore().Peers()
fmt.Printf("%s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers))
for _, id := range h2Peers {
	fmt.Printf("- %s\n", id)
}

h3Peers := h3.Peerstore().Peers()
fmt.Printf("%s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers))
for _, id := range h2Peers {
	fmt.Printf("- %s\n", id)
}
Output:

Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF
Instantiated node in pool 1 with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA
QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains 3 nodes:
- QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
- QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF
- QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains 3 nodes:
- QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF
- QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
- QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA
QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains 3 nodes:
- QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF
- QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
- QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA
Example (StoreManifest)
server := startMockServer("127.0.0.1:4000")
defer func() {
	// Shutdown the server after test
	if err := server.Shutdown(context.Background()); err != nil {
		panic(err) // Handle the error as you see fit
	}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "debug"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.
rng := rand.New(rand.NewSource(42))

// Instantiate the first node in the pool
pid1, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h1, err := libp2p.New(libp2p.Identity(pid1))
if err != nil {
	panic(err)
}
n1, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h1),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithPingCount(5),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

// Instantiate the second node in the pool
pid2, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h2, err := libp2p.New(libp2p.Identity(pid2))
if err != nil {
	panic(err)
}
n2, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h2),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithPingCount(5),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Instantiate the third node in the pool
pid3, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h3, err := libp2p.New(libp2p.Identity(pid3))
if err != nil {
	panic(err)
}
n3, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h3),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithPingCount(5),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n3.Start(ctx); err != nil {
	panic(err)
}
defer n3.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String())

// Instantiate the third node in the pool
pid4, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h4, err := libp2p.New(libp2p.Identity(pid4))
if err != nil {
	panic(err)
}
n4, err := blox.New(
	blox.WithPoolName(poolName),
	blox.WithTopicName(poolName),
	blox.WithHost(h4),
	blox.WithUpdatePoolName(updatePoolName),
	blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
	blox.WithPingCount(5),
	blox.WithExchangeOpts(
		exchange.WithDhtProviderOptions(
			dht.ProtocolExtension(protocol.ID("/"+poolName)),
			dht.ProtocolPrefix("/fula"),
			dht.Resiliency(1),
			dht.Mode(dht.ModeAutoServer),
		),
	),
)
if err != nil {
	panic(err)
}
if err := n4.Start(ctx); err != nil {
	panic(err)
}
defer n4.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h4.ID().String())

// Connect n1 to n2.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)
if err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil {
	panic(err)
}
h3.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)
if err = h3.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil {
	panic(err)
}
h4.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)
if err = h4.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil {
	panic(err)
}

for {
	if len(h1.Peerstore().Peers()) >= 3 &&
		len(h2.Peerstore().Peers()) >= 3 &&
		len(h3.Peerstore().Peers()) >= 3 &&
		len(h4.Peerstore().Peers()) >= 3 {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

// Authorize exchange between the two nodes
if err := n1.SetAuth(ctx, h1.ID(), h2.ID(), true); err != nil {
	panic(err)
}
if err := n2.SetAuth(ctx, h2.ID(), h1.ID(), true); err != nil {
	panic(err)
}

// Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1
n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignBool(true)
})
n1leafLink, err := n1.Store(ctx, n1leaf)
if err != nil {
	panic(err)
}
n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignInt(42)
	na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink)
})
n1RootLink, err := n1.Store(ctx, n1Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

// Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1
n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignBool(false)
})
n2leafLink, err := n2.Store(ctx, n2leaf)
if err != nil {
	panic(err)
}
n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignInt(24)
	na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink)
})
n2RootLink, err := n2.Store(ctx, n2Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

fmt.Println("exchanging by Pull...")
// Pull the sample DAG stored on node 1 from node 2 by only asking for the root link.
// Because fetch implementation is recursive, it should fetch the leaf link too.
if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil {
	panic(err)
}

// Assert that n2 now has both root and leaf links
if exists, err := n2.Has(ctx, n1RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n2.Has(ctx, n1leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}

fmt.Println("exchanging by Push...")
// Push the sample DAG stored on node 2 to node 1 by only pushing the root link.
// Because Push implementation is recursive, it should push the leaf link too.
if err := n2.Push(ctx, h1.ID(), n2RootLink); err != nil {
	panic(err)
}

// Since push is an asynchronous operation, wait until background push is finished
// by periodically checking if link is present on node 1.
for {
	if exists, _ := n1.Has(ctx, n2RootLink); exists {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

// Assert that n1 now has both root and leaf links
if exists, err := n1.Has(ctx, n2RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have pushed the entire sample DAG")
} else {
	fmt.Printf("%s successfully pushed:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n1.Load(ctx, n2RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n1.Has(ctx, n2leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have pushed the entire sample DAG")
} else {
	fmt.Printf("%s successfully pushed:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n1.Load(ctx, n2leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
peerlist3, err := n3.FindLinkProvidersByDht(n2leafLink)
if err != nil {
	fmt.Print("Error happened in FindLinkProvidersByDht3")
	panic(err)
}

// Iterate over the slice and print the peer ID of each AddrInfo
for _, addrInfo := range peerlist3 {
	fmt.Printf("Found %s on %s\n", n2leafLink, addrInfo.ID.String()) // ID.String() converts the peer ID to a string
}

n3.FetchAvailableManifestsAndStore(ctx, 2)
time.Sleep(1 * time.Second)
n4.FetchAvailableManifestsAndStore(ctx, 2)
Output:

Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF
Instantiated node in pool 1 with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA
Instantiated node in pool 1 with ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe
QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links:
    root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links:
    root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
exchanging by Pull...
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched:
    link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"oneLeafLink":{"/":"bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"},"that":42}
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched:
    link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"this":true}
exchanging by Push...
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed:
    link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"anotherLeafLink":{"/":"bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4"},"this":24}
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed:
    link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"that":false}
Found bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4 on QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34","bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy"],"pool_id":1}
Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34","bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy"],"pool_id":1}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Blox

type Blox struct {
	// contains filtered or unexported fields
}

func New

func New(o ...Option) (*Blox, error)

func (*Blox) AnnounceJoinPoolRequestPeriodically added in v1.15.0

func (p *Blox) AnnounceJoinPoolRequestPeriodically(ctx context.Context)

func (*Blox) FetchAvailableManifestsAndStore added in v1.16.0

func (p *Blox) FetchAvailableManifestsAndStore(ctx context.Context, maxCids int) error

FetchAvailableManifestsAndStore fetches available manifests and stores them.

func (*Blox) FindLinkProvidersByDht added in v1.16.0

func (p *Blox) FindLinkProvidersByDht(l ipld.Link) ([]peer.AddrInfo, error)

func (*Blox) GetBlMembers added in v1.15.0

func (p *Blox) GetBlMembers() map[peer.ID]common.MemberStatus

func (*Blox) Has

func (p *Blox) Has(ctx context.Context, l ipld.Link) (bool, error)

func (*Blox) Load

func (p *Blox) Load(ctx context.Context, l ipld.Link, np ipld.NodePrototype) (ipld.Node, error)

func (*Blox) Ping added in v1.15.0

func (p *Blox) Ping(ctx context.Context, to peer.ID) (int, int, error)

func (*Blox) PingDht added in v1.16.0

func (p *Blox) PingDht(to peer.ID) error

func (*Blox) ProvideLinkByDht added in v1.16.0

func (p *Blox) ProvideLinkByDht(l ipld.Link) error

func (*Blox) PubsubValidator added in v1.15.0

func (p *Blox) PubsubValidator(ctx context.Context, id peer.ID, msg *pubsub.Message) bool

func (*Blox) Pull

func (p *Blox) Pull(ctx context.Context, from peer.ID, l ipld.Link) error

func (*Blox) Push

func (p *Blox) Push(ctx context.Context, from peer.ID, l ipld.Link) error

func (*Blox) ServeIpfsRpc added in v1.14.2

func (p *Blox) ServeIpfsRpc() http.Handler

func (*Blox) SetAuth added in v0.8.3

func (p *Blox) SetAuth(ctx context.Context, on peer.ID, subject peer.ID, allow bool) error

func (*Blox) Shutdown

func (p *Blox) Shutdown(ctx context.Context) error

func (*Blox) Start

func (p *Blox) Start(ctx context.Context) error

func (*Blox) StartAnnouncementServer added in v1.15.0

func (p *Blox) StartAnnouncementServer(ctx context.Context) error

func (*Blox) StartPingServer added in v1.15.0

func (p *Blox) StartPingServer(ctx context.Context) error

func (*Blox) Store

func (p *Blox) Store(ctx context.Context, n ipld.Node) (ipld.Link, error)

func (*Blox) StoreCid added in v1.16.0

func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error

func (*Blox) StoreManifest added in v1.16.0

func (p *Blox) StoreManifest(ctx context.Context, links []blockchain.LinkWithLimit, maxCids int) error

func (*Blox) UpdateDhtPeers added in v1.16.0

func (p *Blox) UpdateDhtPeers(peers []peer.ID) error

type CidStruct added in v1.14.2

type CidStruct struct {
	Root string `json:"/"`
}

type FilesStat added in v1.14.2

type FilesStat struct {
	Blocks         int    `json:"Blocks"`
	CumulativeSize uint64 `json:"CumulativeSize"`
	Hash           string `json:"Hash"`
	Local          bool   `json:"Local,omitempty"` // Optional field. 'omitempty' keyword is used to exclude the field from the output if it's default/zero value
	Size           uint64 `json:"Size"`
	SizeLocal      uint64 `json:"SizeLocal,omitempty"` // Optional field.
	Type           string `json:"Type"`
	WithLocality   bool   `json:"WithLocality,omitempty"` // Optional field.
}

type Option

type Option func(*options) error

func WithAnnounceInterval

func WithAnnounceInterval(i time.Duration) Option

WithAnnounceInterval sets the interval at which announcements are made on the pubsub. Defaults to 5 seconds if unset.

func WithBlockchainEndPoint added in v1.15.0

func WithBlockchainEndPoint(b string) Option

func WithDatastore

func WithDatastore(ds datastore.Batching) Option

func WithExchangeOpts added in v1.0.0

func WithExchangeOpts(eo ...exchange.Option) Option

func WithHost

func WithHost(h host.Host) Option

WithHost sets the libp2p host on which the blox is exposed. If unset a default host with random identity is used. See: libp2p.New.

func WithLinkSystem

func WithLinkSystem(ls *ipld.LinkSystem) Option

func WithMaxPingTime added in v1.15.0

func WithMaxPingTime(pt int) Option

func WithMinSuccessPingRate added in v1.15.0

func WithMinSuccessPingRate(sr int) Option

func WithPingCount added in v1.15.0

func WithPingCount(pc int) Option

func WithPoolName

func WithPoolName(n string) Option

WithPoolName sets a human readable name for the pool that the blox should join or create. Required.

func WithRelays added in v1.15.0

func WithRelays(r []string) Option

WithStoreDir sets a the store directory we are using for datastore Required.

func WithStoreDir added in v1.14.2

func WithStoreDir(n string) Option

WithStoreDir sets a the store directory we are using for datastore Required.

func WithTopicName

func WithTopicName(n string) Option

WithTopicName sets the name of the topic onto which announcements are made. Defaults to "/explore.fula/pools/<pool-name>" if unset. See: WithPoolName.

func WithUpdatePoolName added in v1.15.0

func WithUpdatePoolName(updatePoolName PoolNameUpdater) Option

type PeerStats added in v1.14.2

type PeerStats struct {
	Exchanged uint64  `json:"Exchanged"`
	Peer      string  `json:"Peer"`
	Recv      uint64  `json:"Recv"`
	Sent      uint64  `json:"Sent"`
	Value     float64 `json:"Value"`
}

type PoolNameUpdater added in v1.15.0

type PoolNameUpdater func(string) error

type RepoInfo added in v1.14.2

type RepoInfo struct {
	NumObjects uint64   `json:"NumObjects"`
	RepoPath   string   `json:"RepoPath"`
	SizeStat   SizeStat `json:"SizeStat"`
	Version    string   `json:"Version"`
	RepoSize   uint64   `json:"RepoSize"`
}

type SizeStat added in v1.14.2

type SizeStat struct {
	RepoSize   uint64 `json:"RepoSize"`
	StorageMax uint64 `json:"StorageMax"`
}

type StatsBitswap added in v1.14.2

type StatsBitswap struct {
	BlocksReceived   uint64      `json:"BlocksReceived"`
	BlocksSent       uint64      `json:"BlocksSent"`
	DataReceived     uint64      `json:"DataReceived"`
	DataSent         uint64      `json:"DataSent"`
	DupBlksReceived  uint64      `json:"DupBlksReceived"`
	DupDataReceived  uint64      `json:"DupDataReceived"`
	MessagesReceived uint64      `json:"MessagesReceived"`
	Peers            []string    `json:"Peers"`
	ProvideBufLen    int         `json:"ProvideBufLen"`
	Wantlist         []CidStruct `json:"Wantlist"`
}

type StatsBw added in v1.14.2

type StatsBw struct {
	RateIn   float64 `json:"RateIn"`
	RateOut  float64 `json:"RateOut"`
	TotalIn  int64   `json:"TotalIn"`
	TotalOut int64   `json:"TotalOut"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL