engine

package
v0.15.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2024 License: Apache-2.0, MIT Imports: 40 Imported by: 12

Documentation

Overview

Package engine provides a reference implementation of the provider.Interface in order to advertise the availability of a list of multihashes to indexer nodes such as "storetheindex". See: https://github.com/ipni/storetheindex

The advertisements are published as a chan of diffs that signal the list of multihashes that are added or removed represented as an IPLD DAG. Walking the chain of advertisements would then provide the latest state of the total multihashes provided by the engine. The list of multihashes are paginated as a collection of interlinked chunks. For the complete advertisement IPLD schema, see:

The engine internally uses "go-libipni/dagsync" to sync the IPLD DAG of advertisements. See: https://github.com/ipni/go-libipni/tree/main/dagsync

Example (AdvertiseHelloWorld)

Example_advertiseHelloWorld shows an example of instantiating an engine.Engine and publishing and advertisement for a sample content.

Note that the advertisement published uses metadata.BitswapMetadata. This is for demonstrative purposes only. The example does not set up the retrieval side for the content.

package main

import (
	"context"
	crand "crypto/rand"
	"fmt"
	"io"

	"github.com/ipni/go-libipni/metadata"
	provider "github.com/ipni/index-provider"
	"github.com/ipni/index-provider/engine"
	"github.com/ipni/index-provider/engine/xproviders"
	"github.com/libp2p/go-libp2p"
	"github.com/libp2p/go-libp2p/core/crypto"
	"github.com/libp2p/go-libp2p/core/peer"
	"github.com/multiformats/go-multiaddr"
	"github.com/multiformats/go-multihash"
)

// Example_advertiseHelloWorld shows an example of instantiating an engine.Engine and publishing
// and advertisement for a sample content.
//
// Note that the advertisement published uses metadata.BitswapMetadata. This is for demonstrative
// purposes only. The example does not set up the retrieval side for the content.
func main() {
	// Get the multihash of content to advertise
	content := "Hello World!"
	sayHelloCtxID := "Say hello"
	fmt.Printf("Preparing to advertise content: '%s'\n", string(content))
	mh, err := multihash.Sum([]byte(content), multihash.SHA2_256, -1)
	if err != nil {
		panic(err)
	}
	fmt.Printf("✓ Generated content multihash: %s\n", mh.B58String())

	// Create a new libp2p host
	h, err := libp2p.New()
	if err != nil {
		panic(err)
	}
	// Only print the first three characters to keep golang example output happy.
	fmt.Printf("✓ Instantiated new libp2p host with peer ID: %s...\n", h.ID().String()[:4])

	// Construct a new provider engine with given libp2p host that announces advertisements over
	// gossipsub.
	engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher))
	if err != nil {
		panic(err)
	}
	fmt.Println("✓ Instantiated provider engine")
	defer engine.Shutdown()

	engine.RegisterMultihashLister(func(ctx context.Context, provider peer.ID, contextID []byte) (provider.MultihashIterator, error) {
		if string(contextID) == sayHelloCtxID {
			return &singleMhIterator{mh: mh}, nil
		}
		return nil, fmt.Errorf("no content is found for context ID: %v", contextID)
	})
	fmt.Printf("✓ Registered lister for context ID: %s\n", sayHelloCtxID)

	// Start the engine
	if err = engine.Start(context.Background()); err != nil {
		panic(err)
	}
	fmt.Println("✓ Provider engine started.")

	// Multiple transports can be included in metadata.
	md := metadata.Default.New(metadata.Bitswap{})

	// Note that this example publishes an ad with bitswap metadata as an example.
	// But it does not instantiate a bitswap server to serve retrievals.
	adCid, err := engine.NotifyPut(context.Background(), nil, []byte(sayHelloCtxID), md)
	if err != nil {
		panic(err)
	}
	// Only print the first three characters to keep golang example output happy.
	fmt.Printf("✓ Published advertisement for content with CID: %s...\n", adCid.String()[:3])

	// Create an advertisement with ExtendedProviders
	providerID := h.ID()
	privKey := h.Peerstore().PrivKey(providerID)
	addrs := h.Addrs()
	mdBytes, err := md.MarshalBinary()
	if err != nil {
		panic(err)
	}

	// Generate random keys and identity for a new ExtendedProvider
	xPrivKey, _, err := crypto.GenerateEd25519Key(crand.Reader)
	if err != nil {
		panic(err)
	}
	xProviderID, err := peer.IDFromPrivateKey(xPrivKey)
	if err != nil {
		panic(err)
	}
	xAddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/9000")
	if err != nil {
		panic(err)
	}

	// Build and sign ExtendedProviders advertisement
	xAd, err := xproviders.NewAdBuilder(providerID, privKey, addrs).
		WithLastAdID(adCid).
		WithExtendedProviders(xproviders.NewInfo(xProviderID, xPrivKey, mdBytes, []multiaddr.Multiaddr{xAddr})).
		WithOverride(true).
		WithContextID([]byte("sample-context-id")).
		WithMetadata(mdBytes).
		BuildAndSign()
	if err != nil {
		panic(err)
	}

	// Publish the advertisement using engine
	xAdCid, err := engine.Publish(context.Background(), *xAd)
	if err != nil {
		panic(err)
	}
	// Only print the first three characters to keep golang example output happy.
	fmt.Printf("✓ Published ExtendedProviders advertisement for content with CID: %s...\n", xAdCid.String()[:3])

	if err := engine.Shutdown(); err != nil {
		panic(err)
	}

}

type singleMhIterator struct {
	offset int
	mh     multihash.Multihash
}

func (s *singleMhIterator) Next() (multihash.Multihash, error) {
	if s.offset == 0 {
		s.offset++
		return s.mh, nil
	}
	return nil, io.EOF
}
Output:

Preparing to advertise content: 'Hello World!'
✓ Generated content multihash: QmWvQxTqbG2Z9HPJgG57jjwR154cKhbtJenbyYTWkjgF3e
✓ Instantiated new libp2p host with peer ID: 12D3...
✓ Instantiated provider engine
✓ Registered lister for context ID: Say hello
✓ Provider engine started.
✓ Published advertisement for content with CID: bag...
✓ Published ExtendedProviders advertisement for content with CID: bag...

Index

Examples

Constants

This section is empty.

Variables

View Source
var (

	// ErrEntriesLinkMismatch signals that the link generated from chunking the mulithashes returned by provider.MultihashLister does not match the previously generated link. This error is most likely caused by the lister returning inconsistent multihashes for the same key.
	ErrEntriesLinkMismatch = errors.New("regenerated link from multihash lister did not match the original link; multihashes returned by the lister for the same key are not consistent")
)

Functions

This section is empty.

Types

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is an implementation of the core reference provider interface.

func New

func New(o ...Option) (*Engine, error)

New creates a new index provider Engine as the default implementation of provider.Interface. It provides the ability to advertise the availability of a list of multihashes associated to a context ID as a chain of linked advertisements as defined by the indexer node protocol implemented by "go-libipni".

Engine internally uses "go-libipni/dagsync", a protocol for propagating and synchronizing changes an IPLD DAG, to publish advertisements. See:

Published advertisements are signed using the given private key. The retAddrs corresponds to the endpoints at which the data block associated to the advertised multihashes can be retrieved. If no retAddrs are specified, then use the listen addresses of the given libp2p host.

The engine also provides the ability to generate advertisements via Engine.NotifyPut and Engine.NotifyRemove as long as a provider.MultihashLister is registered. See: provider.MultihashLister, Engine.RegisterMultihashLister.

The engine must be started via Engine.Start before use and discarded via Engine.Shutdown when no longer needed.

func (*Engine) GetAdv

func (e *Engine) GetAdv(_ context.Context, adCid cid.Cid) (*schema.Advertisement, error)

GetAdv gets the advertisement associated to the given cid c. The context is not used.

func (*Engine) GetLatestAdv

func (e *Engine) GetLatestAdv(ctx context.Context) (cid.Cid, *schema.Advertisement, error)

GetLatestAdv gets the latest advertisement by the provider. If there are no previously published advertisements, then cid.Undef is returned as the advertisement CID.

func (*Engine) GetPublisherHttpFunc added in v0.13.5

func (e *Engine) GetPublisherHttpFunc() (http.HandlerFunc, error)

GetPublisherHttpFunc gets the http.HandlerFunc that can be used to serve advertisements over HTTP. The returned handler is only valid if the PublisherKind is HttpPublisher and the HttpPublisherWithoutServer option is set.

func (*Engine) LinkSystem added in v0.11.1

func (e *Engine) LinkSystem() *ipld.LinkSystem

LinkSystem gets the link system used by the engine to store and retrieve advertisement data.

func (*Engine) NotifyPut

func (e *Engine) NotifyPut(ctx context.Context, provider *peer.AddrInfo, contextID []byte, md metadata.Metadata) (cid.Cid, error)

NotifyPut publishes an advertisement that signals the list of multihashes associated to the given contextID is available by this provider with the given metadata. A provider.MultihashLister is required, and is used to look up the list of multihashes associated to a context ID.

Note that prior to calling this function a provider.MultihashLister must be registered.

See: Engine.RegisterMultihashLister, Engine.Publish.

func (*Engine) NotifyRemove

func (e *Engine) NotifyRemove(ctx context.Context, provider peer.ID, contextID []byte) (cid.Cid, error)

NotifyRemove publishes an advertisement that signals the list of multihashes associated to the given contextID is no longer available by this provider.

Note that prior to calling this function a provider.MultihashLister must be registered.

See: Engine.RegisterMultihashLister, Engine.Publish.

func (*Engine) Publish

func (e *Engine) Publish(ctx context.Context, adv schema.Advertisement) (cid.Cid, error)

Publish stores the given advertisement locally via Engine.PublishLocal first. It then announces the availability of the new advertisement by sending an announcement message via HTTP and/or gossipsub to indexers, depending on configuration..

The publication mechanism uses dagsync.Publisher internally. See: https://github.com/ipni/go-libipni/tree/main/dagsync

func (*Engine) PublishLatest

func (e *Engine) PublishLatest(ctx context.Context) (cid.Cid, error)

PublishLatest re-publishes the latest existing advertisement and send announcements using the engine's configured senders.

func (*Engine) PublishLatestHTTP

func (e *Engine) PublishLatestHTTP(ctx context.Context, announceURLs ...*url.URL) (cid.Cid, error)

PublishLatestHTTP publishes the latest existing advertisement and sends direct HTTP announcements to the specified URLs.

func (*Engine) PublishLocal

func (e *Engine) PublishLocal(ctx context.Context, adv schema.Advertisement) (cid.Cid, error)

PublishLocal stores the advertisement in the local link system and marks it locally as the latest advertisement.

The context is used for storing internal mapping information onto the datastore.

See: Engine.Publish.

func (*Engine) RegisterMultihashLister

func (e *Engine) RegisterMultihashLister(mhl provider.MultihashLister)

RegisterMultihashLister registers a provider.MultihashLister that is used to look up the list of multihashes associated to a context ID. At least one such registration must be registered before calls to Engine.NotifyPut and Engine.NotifyRemove.

Note that successive calls to this function will replace the previous registration. Only a single registration is supported.

See: provider.Interface

func (*Engine) Shutdown

func (e *Engine) Shutdown() error

Shutdown shuts down the engine and discards all resources opened by the engine. The engine is no longer usable after the call to this function.

func (*Engine) Start

func (e *Engine) Start(ctx context.Context) error

Start starts the engine by instantiating the internal storage and joining the configured gossipsub topic used for publishing advertisements.

The context is used to instantiate the internal LRU cache storage. See: Engine.Shutdown, chunker.NewCachedEntriesChunker.

type Option

type Option func(*options) error

Option sets a configuration parameter for the provider engine.

func WithChainedEntries

func WithChainedEntries(chunkSize int) Option

WithChainedEntries sets format of advertisement entries to chained Entry Chunk with the given chunkSize as the maximum number of multihashes per chunk.

If unset, advertisement entries are formatted as chained Entry Chunk with default maximum of 16384 multihashes per chunk.

To use HAMT as the advertisement entries format, see: WithHamtEntries. For caching configuration: WithEntriesCacheCapacity, chunker.CachedEntriesChunker

func WithDatastore

func WithDatastore(ds datastore.Batching) Option

WithDatastore sets the datastore that is used by the engine to store advertisements. If unspecified, an ephemeral in-memory datastore is used. See: datastore.NewMapDatastore.

func WithDirectAnnounce

func WithDirectAnnounce(announceURLs ...string) Option

WithDirectAnnounce sets indexer URLs to send direct HTTP announcements to.

func WithEntriesCacheCapacity

func WithEntriesCacheCapacity(s int) Option

WithEntriesCacheCapacity sets the maximum number of advertisement entries DAG to cache. The cached DAG may be in chained Entry Chunk or HAMT format. See WithChainedEntries and WithHamtEntries to select the ad entries DAG format.

If unset, the default capacity of 1024 is used. This means at most 1024 DAGs will be cached.

The cache is evicted using LRU policy. Note that the capacity dictates the number of complete chains that are cached, not individual entry chunks. This means, the maximum storage used by the cache is a factor of capacity, chunk size and the length of multihashes in each chunk.

As an example, for 128-bit long multihashes the cache with default capacity of 1024, and default chunk size of 16384 can grow up to 256MiB when full.

func WithExtraGossipData

func WithExtraGossipData(extraData []byte) Option

WithExtraGossipData supplies extra data to include in the pubsub announcement. Note that this option only takes effect if pubsub announcements are enabled.

func WithHamtEntries

func WithHamtEntries(hashAlg multicodec.Code, bitWidth, bucketSize int) Option

WithHamtEntries sets format of advertisement entries to HAMT with the given hash algorithm, bit-width and bucket size.

If unset, advertisement entries are formatted as chained Entry Chunk with default maximum of 16384 multihashes per chunk.

Only multicodec.Identity, multicodec.Sha2_256 and multicodec.Murmur3X64_64 are supported as hash algorithm. The bit-width and bucket size must be at least 3 and 1 respectively. For more information on HAMT data structure, see:

For caching configuration: WithEntriesCacheCapacity, chunker.CachedEntriesChunker

func WithHost

func WithHost(h host.Host) Option

WithHost specifies the host to which the provider engine belongs. If unspecified, a host is created automatically. See: libp2p.New.

func WithHttpPublisherAnnounceAddr

func WithHttpPublisherAnnounceAddr(addr string) Option

WithHttpPublisherAnnounceAddr sets the address to be supplied in announce messages to tell indexers where to retrieve advertisements.

This option is not used if PublisherKind is set to DataTransferPublisher.

func WithHttpPublisherHandlerPath added in v0.13.5

func WithHttpPublisherHandlerPath(handlerPath string) Option

WithHttpPublisherHandlerPath should only be used with WithHttpPublisherWithoutServer

func WithHttpPublisherListenAddr

func WithHttpPublisherListenAddr(addr string) Option

WithHttpPublisherListenAddr sets the net listen address for the HTTP publisher. If unset, the default net listen address of '0.0.0.0:3104' is used. To disable plain HTTP and only serve libp2phttp, explicitly set this to "".

This option only takes effect if the PublisherKind is set to HttpPublisher. See: WithPublisherKind.

func WithHttpPublisherWithoutServer added in v0.13.5

func WithHttpPublisherWithoutServer() Option

WithHttpPublisherWithoutServer sets the HTTP publisher to not start a server. Setting up the handler is left to the user.

func WithPrivateKey added in v0.13.5

func WithPrivateKey(key crypto.PrivKey) Option

func WithProvider

func WithProvider(provider peer.AddrInfo) Option

WithProvider sets the peer and addresses for the provider to put in indexing advertisements. This value overrides `WithRetrievalAddrs`

func WithPublisherKind

func WithPublisherKind(k PublisherKind) Option

WithPublisherKind sets the kind of publisher used to serve advertisements. If unset, advertisements are only stored locally and no announcements are made. This does not affect the methods used to send announcements of new advertisements, which are configured independent of this.

See: PublisherKind.

func WithPubsubAnnounce added in v0.14.3

func WithPubsubAnnounce(enable bool) Option

WithPubsubAnnounce configures whether or not announcements are send via gossip pubsub. Default is true if this option is not specified.

func WithPurgeCacheOnStart

func WithPurgeCacheOnStart(p bool) Option

WithPurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine starts. If unset, cache is rehydrated from previously cached entries stored in datastore if present. See: WithDatastore.

func WithRetrievalAddrs

func WithRetrievalAddrs(addrs ...string) Option

WithRetrievalAddrs sets the addresses that specify where to get the content corresponding to an indexing advertisement. If unspecified, the libp2p host listen addresses are used. See: WithHost.

func WithStorageReadOpenerErrorHook added in v0.15.1

func WithStorageReadOpenerErrorHook(hook func(ipld.LinkContext, ipld.Link, error) error) Option

WithStorageReadOpenerErrorHook allows the calling applicaiton to invoke a custom piece logic whenever a storage read opener error occurs. For example the calling application can delete corrupted / create a new advertisement if the datastore was corrupted for some reason. The calling application can return ipld.ErrNotFound{} to indicate IPNI that this advertisement should be skipped without halting processing of the rest of the chain.

func WithSyncPolicy

func WithSyncPolicy(syncPolicy *policy.Policy) Option

func WithTopic

func WithTopic(t *pubsub.Topic) Option

WithTopic sets the pubsub topic on which new advertisements are announced. To use the default pubsub configuration with a specific topic name, use WithTopicName. If both options are specified, WithTopic takes presence.

Note that this option only takes effect if the PublisherKind is set to DataTransferPublisher. See: WithPublisherKind.

func WithTopicName

func WithTopicName(t string) Option

WithTopicName sets the topic name on which pubsub announcements are published. To override the default pubsub configuration, use WithTopic.

Note that this option only takes effect if the PublisherKind is set to DataTransferPublisher. See: WithPublisherKind.

type PublisherKind

type PublisherKind string

PublisherKind represents the kind of publisher to use in order to announce a new advertisement to the network. See: WithPublisherKind

const (
	// NoPublisher indicates that no announcements are made to the network and
	// all advertisements are only stored locally.
	NoPublisher PublisherKind = ""

	// HttpPublisher exposes an HTTP server that serves advertisements using an
	// HTTP server.
	HttpPublisher PublisherKind = "http"

	// Libp2pPublisher serves advertisements using the engine's libp2p host.
	Libp2pPublisher PublisherKind = "libp2p"

	// Libp2pHttpPublisher serves advertisements using both an HTTP server and
	// engine's libp2p host. This is just the combination of HttpPublisher and
	// Libp2pPublisher configurable as a single option.
	Libp2pHttpPublisher PublisherKind = "libp2phttp"

	// Deprecated. Use Libp2pPublisher.
	DataTransferPublisher PublisherKind = "dtsync"
)

Directories

Path Synopsis
Package chunker provides functionality for chunking ad entries generated from provider.MultihashIterator into an IPLD DAG.
Package chunker provides functionality for chunking ad entries generated from provider.MultihashIterator into an IPLD DAG.
Package peerutil provides utilities around peer ID values.
Package peerutil provides utilities around peer ID values.
Package xproviders provides convinience classes for building and signing advertisements with ExtendedProviders, that implement extended providers specification.
Package xproviders provides convinience classes for building and signing advertisements with ExtendedProviders, that implement extended providers specification.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL