Documentation ¶
Overview ¶
Example (PoolDiscoverPeersViaPubSub) ¶
Example_poolDiscoverPeersViaPubSub starts a pool named "my-pool" across three nodes, connects two of the nodes to the other one to facilitate a path for pubsub to propagate and shows all three nodes discover each other using pubsub.
const poolName = "my-pool" ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) defer cancel() // Elevate log level to show internal communications. if err := logging.SetLogLevel("*", "info"); err != nil { panic(err) } // Use a deterministic random generator to generate deterministic // output for the example. rng := rand.New(rand.NewSource(42)) // Instantiate the first node in the pool pid1, _, err := crypto.GenerateECDSAKeyPair(rng) if err != nil { panic(err) } h1, err := libp2p.New(libp2p.Identity(pid1)) if err != nil { panic(err) } n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1)) if err != nil { panic(err) } if err := n1.Start(ctx); err != nil { panic(err) } defer n1.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) // Instantiate the second node in the pool pid2, _, err := crypto.GenerateECDSAKeyPair(rng) if err != nil { panic(err) } h2, err := libp2p.New(libp2p.Identity(pid2)) if err != nil { panic(err) } n2, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h2)) if err != nil { panic(err) } if err := n2.Start(ctx); err != nil { panic(err) } defer n2.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) // Instantiate the third node in the pool pid3, _, err := crypto.GenerateECDSAKeyPair(rng) if err != nil { panic(err) } h3, err := libp2p.New(libp2p.Identity(pid3)) if err != nil { panic(err) } n3, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h3)) if err != nil { panic(err) } if err := n3.Start(ctx); err != nil { panic(err) } defer n3.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) // Connect n1 to n2 and n3 so that there is a path for gossip propagation. // Note that we are not connecting n2 to n3 as they should discover // each other via pool's iexist announcements. h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { panic(err) } h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { panic(err) } // Wait until the nodes discover each other for { if len(h1.Peerstore().Peers()) == 3 && len(h2.Peerstore().Peers()) == 3 && len(h3.Peerstore().Peers()) == 3 { break } select { case <-ctx.Done(): panic(ctx.Err()) default: time.Sleep(time.Second) } } h1Peers := h1.Peerstore().Peers() fmt.Printf("%s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) for _, id := range h1Peers { fmt.Printf("- %s\n", id) } h2Peers := h2.Peerstore().Peers() fmt.Printf("%s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers)) for _, id := range h2Peers { fmt.Printf("- %s\n", id) } h3Peers := h3.Peerstore().Peers() fmt.Printf("%s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers)) for _, id := range h2Peers { fmt.Printf("- %s\n", id) }
Output: Instantiated node in pool my-pool with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT Instantiated node in pool my-pool with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF Instantiated node in pool my-pool with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains 3 nodes: - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains 3 nodes: - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains 3 nodes: - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA
Example (PoolExchangeDagBetweenPoolNodes) ¶
Example_poolExchangeDagBetweenPoolNodes starts up a pool with 2 nodes, stores a sample DAG in one node and fetches it via GraphSync from the other node.
const poolName = "my-pool" ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Elevate log level to show internal communications. if err := logging.SetLogLevel("*", "info"); err != nil { panic(err) } // Use a deterministic random generator to generate deterministic // output for the example. rng := rand.New(rand.NewSource(42)) // Instantiate the first node in the pool pid1, _, err := crypto.GenerateECDSAKeyPair(rng) if err != nil { panic(err) } h1, err := libp2p.New(libp2p.Identity(pid1)) if err != nil { panic(err) } n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1)) if err != nil { panic(err) } if err := n1.Start(ctx); err != nil { panic(err) } defer n1.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) // Instantiate the second node in the pool pid2, _, err := crypto.GenerateECDSAKeyPair(rng) if err != nil { panic(err) } h2, err := libp2p.New(libp2p.Identity(pid2)) if err != nil { panic(err) } n2, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h2)) if err != nil { panic(err) } if err := n2.Start(ctx); err != nil { panic(err) } defer n2.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) // Connect n1 to n2. h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { panic(err) } h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) if err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { panic(err) } // Authorize exchange between the two nodes if err := n1.SetAuth(ctx, h1.ID(), h2.ID(), true); err != nil { panic(err) } if err := n2.SetAuth(ctx, h2.ID(), h1.ID(), true); err != nil { panic(err) } // Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1 n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { na.AssembleEntry("this").AssignBool(true) }) n1leafLink, err := n1.Store(ctx, n1leaf) if err != nil { panic(err) } n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { na.AssembleEntry("that").AssignInt(42) na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink) }) n1RootLink, err := n1.Store(ctx, n1Root) if err != nil { panic(err) } fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) // Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1 n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { na.AssembleEntry("that").AssignBool(false) }) n2leafLink, err := n2.Store(ctx, n2leaf) if err != nil { panic(err) } n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { na.AssembleEntry("this").AssignInt(24) na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink) }) n2RootLink, err := n2.Store(ctx, n2Root) if err != nil { panic(err) } fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) fmt.Println("exchanging by Pull...") // Pull the sample DAG stored on node 1 from node 2 by only asking for the root link. // Because fetch implementation is recursive, it should fetch the leaf link too. if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil { panic(err) } // Assert that n2 now has both root and leaf links if exists, err := n2.Has(ctx, n1RootLink); err != nil { panic(err) } else if !exists { panic("expected n2 to have fetched the entire sample DAG") } else { fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1RootLink, h1.ID()) n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any) if err != nil { panic(err) } var buf bytes.Buffer if err := dagjson.Encode(n, &buf); err != nil { panic(err) } fmt.Printf(" content: %s\n", buf.String()) } if exists, err := n2.Has(ctx, n1leafLink); err != nil { panic(err) } else if !exists { panic("expected n2 to have fetched the entire sample DAG") } else { fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1leafLink, h1.ID()) n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any) if err != nil { panic(err) } var buf bytes.Buffer if err := dagjson.Encode(n, &buf); err != nil { panic(err) } fmt.Printf(" content: %s\n", buf.String()) } fmt.Println("exchanging by Push...") // Push the sample DAG stored on node 2 to node 1 by only pushing the root link. // Because Push implementation is recursive, it should push the leaf link too. if err := n2.Push(ctx, h1.ID(), n2RootLink); err != nil { panic(err) } // Since push is an asynchronous operation, wait until background push is finished // by periodically checking if link is present on node 1. for { if exists, _ := n1.Has(ctx, n2RootLink); exists { break } select { case <-ctx.Done(): panic(ctx.Err()) default: time.Sleep(time.Second) } } // Assert that n1 now has both root and leaf links if exists, err := n1.Has(ctx, n2RootLink); err != nil { panic(err) } else if !exists { panic("expected n2 to have pushed the entire sample DAG") } else { fmt.Printf("%s successfully pushed:\n link: %s\n from %s\n", h2.ID(), n1RootLink, h1.ID()) n, err := n1.Load(ctx, n2RootLink, basicnode.Prototype.Any) if err != nil { panic(err) } var buf bytes.Buffer if err := dagjson.Encode(n, &buf); err != nil { panic(err) } fmt.Printf(" content: %s\n", buf.String()) } if exists, err := n1.Has(ctx, n2leafLink); err != nil { panic(err) } else if !exists { panic("expected n2 to have pushed the entire sample DAG") } else { fmt.Printf("%s successfully pushed:\n link: %s\n from %s\n", h2.ID(), n1leafLink, h1.ID()) n, err := n1.Load(ctx, n2leafLink, basicnode.Prototype.Any) if err != nil { panic(err) } var buf bytes.Buffer if err := dagjson.Encode(n, &buf); err != nil { panic(err) } fmt.Printf(" content: %s\n", buf.String()) }
Output: Instantiated node in pool my-pool with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT Instantiated node in pool my-pool with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links: root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links: root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 exchanging by Pull... QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched: link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT content: {"oneLeafLink":{"/":"bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"},"that":42} QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched: link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT content: {"this":true} exchanging by Push... QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed: link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT content: {"anotherLeafLink":{"/":"bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4"},"this":24} QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed: link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT content: {"that":false}
Index ¶
- Constants
- Variables
- type Announcement
- type AnnouncementType
- type Blox
- func (p *Blox) Has(ctx context.Context, l ipld.Link) (bool, error)
- func (p *Blox) Load(ctx context.Context, l ipld.Link, np ipld.NodePrototype) (ipld.Node, error)
- func (p *Blox) Pull(ctx context.Context, from peer.ID, l ipld.Link) error
- func (p *Blox) Push(ctx context.Context, from peer.ID, l ipld.Link) error
- func (p *Blox) SetAuth(ctx context.Context, on peer.ID, subject peer.ID, allow bool) error
- func (p *Blox) Shutdown(ctx context.Context) error
- func (p *Blox) Start(ctx context.Context) error
- func (p *Blox) Store(ctx context.Context, n ipld.Node) (ipld.Link, error)
- type Option
- func WithAnnounceInterval(i time.Duration) Option
- func WithDatastore(ds datastore.Batching) Option
- func WithExchangeOpts(eo ...exchange.Option) Option
- func WithHost(h host.Host) Option
- func WithLinkSystem(ls *ipld.LinkSystem) Option
- func WithPoolName(n string) Option
- func WithTopicName(n string) Option
Examples ¶
Constants ¶
const Version0 = "0"
Variables ¶
var ( PubSubPrototypes struct { Announcement schema.TypedPrototype } )
Functions ¶
This section is empty.
Types ¶
type Announcement ¶
type Announcement struct { Version string Type AnnouncementType Addrs []string }
func (*Announcement) GetAddrs ¶
func (a *Announcement) GetAddrs() ([]multiaddr.Multiaddr, error)
func (*Announcement) MarshalBinary ¶
func (a *Announcement) MarshalBinary() ([]byte, error)
func (*Announcement) SetAddrs ¶
func (a *Announcement) SetAddrs(ma ...multiaddr.Multiaddr)
func (*Announcement) UnmarshalBinary ¶
func (a *Announcement) UnmarshalBinary(b []byte) error
type AnnouncementType ¶
type AnnouncementType int
const ( UnknownAnnouncementType AnnouncementType = iota IExistAnnouncementType )
type Blox ¶
type Blox struct {
// contains filtered or unexported fields
}
type Option ¶
type Option func(*options) error
func WithAnnounceInterval ¶
WithAnnounceInterval sets the interval at which announcements are made on the pubsub. Defaults to 5 seconds if unset.
func WithDatastore ¶
func WithDatastore(ds datastore.Batching) Option
func WithExchangeOpts ¶ added in v1.0.0
func WithHost ¶
WithHost sets the libp2p host on which the blox is exposed. If unset a default host with random identity is used. See: libp2p.New.
func WithLinkSystem ¶
func WithLinkSystem(ls *ipld.LinkSystem) Option
func WithPoolName ¶
WithPoolName sets a human readable name for the pool that the blox should join or create. Required.
func WithTopicName ¶
WithTopicName sets the name of the topic onto which announcements are made. Defaults to "/explore.fula/pools/<pool-name>" if unset. See: WithPoolName.