blox

package
v1.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2023 License: MIT Imports: 28 Imported by: 0

Documentation

Overview

Example (PoolDiscoverPeersViaPubSub)

Example_poolDiscoverPeersViaPubSub starts a pool named "my-pool" across three nodes, connects two of the nodes to the other one to facilitate a path for pubsub to propagate and shows all three nodes discover each other using pubsub.

const poolName = "my-pool"
ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "info"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.
rng := rand.New(rand.NewSource(42))

// Instantiate the first node in the pool
pid1, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h1, err := libp2p.New(libp2p.Identity(pid1))
if err != nil {
	panic(err)
}
n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1))
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

// Instantiate the second node in the pool
pid2, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h2, err := libp2p.New(libp2p.Identity(pid2))
if err != nil {
	panic(err)
}
n2, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h2))
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Instantiate the third node in the pool
pid3, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h3, err := libp2p.New(libp2p.Identity(pid3))
if err != nil {
	panic(err)
}
n3, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h3))
if err != nil {
	panic(err)
}
if err := n3.Start(ctx); err != nil {
	panic(err)
}
defer n3.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String())

// Connect n1 to n2 and n3 so that there is a path for gossip propagation.
// Note that we are not connecting n2 to n3 as they should discover
// each other via pool's iexist announcements.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil {
	panic(err)
}

// Wait until the nodes discover each other
for {
	if len(h1.Peerstore().Peers()) == 3 &&
		len(h2.Peerstore().Peers()) == 3 &&
		len(h3.Peerstore().Peers()) == 3 {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

h1Peers := h1.Peerstore().Peers()
fmt.Printf("%s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers))
for _, id := range h1Peers {
	fmt.Printf("- %s\n", id)
}

h2Peers := h2.Peerstore().Peers()
fmt.Printf("%s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers))
for _, id := range h2Peers {
	fmt.Printf("- %s\n", id)
}

h3Peers := h3.Peerstore().Peers()
fmt.Printf("%s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers))
for _, id := range h2Peers {
	fmt.Printf("- %s\n", id)
}
Output:

Instantiated node in pool my-pool with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
Instantiated node in pool my-pool with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF
Instantiated node in pool my-pool with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA
QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains 3 nodes:
- QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
- QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF
- QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains 3 nodes:
- QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF
- QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
- QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA
QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains 3 nodes:
- QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF
- QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
- QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA
Example (PoolExchangeDagBetweenPoolNodes)

Example_poolExchangeDagBetweenPoolNodes starts up a pool with 2 nodes, stores a sample DAG in one node and fetches it via GraphSync from the other node.

const poolName = "my-pool"
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "info"); err != nil {
	panic(err)
}

// Use a deterministic random generator to generate deterministic
// output for the example.
rng := rand.New(rand.NewSource(42))

// Instantiate the first node in the pool
pid1, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h1, err := libp2p.New(libp2p.Identity(pid1))
if err != nil {
	panic(err)
}
n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1))
if err != nil {
	panic(err)
}
if err := n1.Start(ctx); err != nil {
	panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())

// Instantiate the second node in the pool
pid2, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
	panic(err)
}
h2, err := libp2p.New(libp2p.Identity(pid2))
if err != nil {
	panic(err)
}
n2, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h2))
if err != nil {
	panic(err)
}
if err := n2.Start(ctx); err != nil {
	panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())

// Connect n1 to n2.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
	panic(err)
}
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)
if err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil {
	panic(err)
}

// Authorize exchange between the two nodes
if err := n1.SetAuth(ctx, h1.ID(), h2.ID(), true); err != nil {
	panic(err)
}
if err := n2.SetAuth(ctx, h2.ID(), h1.ID(), true); err != nil {
	panic(err)
}

// Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1
n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignBool(true)
})
n1leafLink, err := n1.Store(ctx, n1leaf)
if err != nil {
	panic(err)
}
n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignInt(42)
	na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink)
})
n1RootLink, err := n1.Store(ctx, n1Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

// Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1
n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) {
	na.AssembleEntry("that").AssignBool(false)
})
n2leafLink, err := n2.Store(ctx, n2leaf)
if err != nil {
	panic(err)
}
n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) {
	na.AssembleEntry("this").AssignInt(24)
	na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink)
})
n2RootLink, err := n2.Store(ctx, n2Root)
if err != nil {
	panic(err)
}
fmt.Printf("%s stored IPLD data with links:\n    root: %s\n    leaf:%s\n", h1.ID(), n1RootLink, n1leafLink)

fmt.Println("exchanging by Pull...")
// Pull the sample DAG stored on node 1 from node 2 by only asking for the root link.
// Because fetch implementation is recursive, it should fetch the leaf link too.
if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil {
	panic(err)
}

// Assert that n2 now has both root and leaf links
if exists, err := n2.Has(ctx, n1RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n2.Has(ctx, n1leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have fetched the entire sample DAG")
} else {
	fmt.Printf("%s successfully fetched:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}

fmt.Println("exchanging by Push...")
// Push the sample DAG stored on node 2 to node 1 by only pushing the root link.
// Because Push implementation is recursive, it should push the leaf link too.
if err := n2.Push(ctx, h1.ID(), n2RootLink); err != nil {
	panic(err)
}

// Since push is an asynchronous operation, wait until background push is finished
// by periodically checking if link is present on node 1.
for {
	if exists, _ := n1.Has(ctx, n2RootLink); exists {
		break
	}
	select {
	case <-ctx.Done():
		panic(ctx.Err())
	default:
		time.Sleep(time.Second)
	}
}

// Assert that n1 now has both root and leaf links
if exists, err := n1.Has(ctx, n2RootLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have pushed the entire sample DAG")
} else {
	fmt.Printf("%s successfully pushed:\n    link: %s\n    from %s\n", h2.ID(), n1RootLink, h1.ID())
	n, err := n1.Load(ctx, n2RootLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
if exists, err := n1.Has(ctx, n2leafLink); err != nil {
	panic(err)
} else if !exists {
	panic("expected n2 to have pushed the entire sample DAG")
} else {
	fmt.Printf("%s successfully pushed:\n    link: %s\n    from %s\n", h2.ID(), n1leafLink, h1.ID())
	n, err := n1.Load(ctx, n2leafLink, basicnode.Prototype.Any)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := dagjson.Encode(n, &buf); err != nil {
		panic(err)
	}
	fmt.Printf("    content: %s\n", buf.String())
}
Output:

Instantiated node in pool my-pool with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
Instantiated node in pool my-pool with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF
QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links:
    root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links:
    root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
exchanging by Pull...
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched:
    link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"oneLeafLink":{"/":"bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"},"that":42}
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched:
    link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"this":true}
exchanging by Push...
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed:
    link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"anotherLeafLink":{"/":"bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4"},"this":24}
QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed:
    link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34
    from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT
    content: {"that":false}

Index

Examples

Constants

View Source
const Version0 = "0"

Variables

View Source
var (
	PubSubPrototypes struct {
		Announcement schema.TypedPrototype
	}
)

Functions

This section is empty.

Types

type Announcement

type Announcement struct {
	Version string
	Type    AnnouncementType
	Addrs   []string
}

func (*Announcement) GetAddrs

func (a *Announcement) GetAddrs() ([]multiaddr.Multiaddr, error)

func (*Announcement) MarshalBinary

func (a *Announcement) MarshalBinary() ([]byte, error)

func (*Announcement) SetAddrs

func (a *Announcement) SetAddrs(ma ...multiaddr.Multiaddr)

func (*Announcement) UnmarshalBinary

func (a *Announcement) UnmarshalBinary(b []byte) error

type AnnouncementType

type AnnouncementType int
const (
	UnknownAnnouncementType AnnouncementType = iota
	IExistAnnouncementType
)

type Blox

type Blox struct {
	// contains filtered or unexported fields
}

func New

func New(o ...Option) (*Blox, error)

func (*Blox) Has

func (p *Blox) Has(ctx context.Context, l ipld.Link) (bool, error)

func (*Blox) Load

func (p *Blox) Load(ctx context.Context, l ipld.Link, np ipld.NodePrototype) (ipld.Node, error)

func (*Blox) Pull

func (p *Blox) Pull(ctx context.Context, from peer.ID, l ipld.Link) error

func (*Blox) Push

func (p *Blox) Push(ctx context.Context, from peer.ID, l ipld.Link) error

func (*Blox) SetAuth added in v0.8.3

func (p *Blox) SetAuth(ctx context.Context, on peer.ID, subject peer.ID, allow bool) error

func (*Blox) Shutdown

func (p *Blox) Shutdown(ctx context.Context) error

func (*Blox) Start

func (p *Blox) Start(ctx context.Context) error

func (*Blox) Store

func (p *Blox) Store(ctx context.Context, n ipld.Node) (ipld.Link, error)

type Option

type Option func(*options) error

func WithAnnounceInterval

func WithAnnounceInterval(i time.Duration) Option

WithAnnounceInterval sets the interval at which announcements are made on the pubsub. Defaults to 5 seconds if unset.

func WithDatastore

func WithDatastore(ds datastore.Batching) Option

func WithExchangeOpts added in v1.0.0

func WithExchangeOpts(eo ...exchange.Option) Option

func WithHost

func WithHost(h host.Host) Option

WithHost sets the libp2p host on which the blox is exposed. If unset a default host with random identity is used. See: libp2p.New.

func WithLinkSystem

func WithLinkSystem(ls *ipld.LinkSystem) Option

func WithPoolName

func WithPoolName(n string) Option

WithPoolName sets a human readable name for the pool that the blox should join or create. Required.

func WithTopicName

func WithTopicName(n string) Option

WithTopicName sets the name of the topic onto which announcements are made. Defaults to "/explore.fula/pools/<pool-name>" if unset. See: WithPoolName.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL