legs

package module
v0.3.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2022 License: Apache-2.0, MIT Imports: 29 Imported by: 12

README ¶

legs 🦵

Legs is an interface for go-data-transfer, providing a 1:1 mechanism for maintaining a synchronized IPLD dag of data between a publisher and a subscriber's current state for that publisher.

Usage

Typically an application will be either a provider or a subscriber, but may be both.

Publisher

Create a legs publisher. Update its root to cause it to publish.

pub, err :=  NewPublisher(host, dsstore, lsys, "/legs/topic")
if err != nil {
	panic(err)
}
...
// Publish updated root.
err = publisher.UpdateRoot(ctx, lnk.(cidlink.Link).Cid)
if err != nil {
	panic(err)
}

Subscriber

The Subscriber handles subscribing to a topic, reading messages from the topic and tracking the state of each publisher.

Create a Subscriber:

sub, err := legs.NewSubscriber(dstHost, dstStore, dstLnkS, "/legs/topic", nil)
if err != nil {
	panic(err)
}

Optionally, request notification of updates:

watcher, cancelWatcher := sub.OnSyncFinished()
defer cancelWatcher()
go watch(watcher)

func watch(notifications <-chan legs.SyncFinished) {
    for {
        syncFinished := <-notifications
        // newHead is now available in the local dataStore
    }
}

To shutdown a Subscriber, call its Close() method.

A Subscriber can be created with a function that determines if the Subscriber accepts or rejects messages from a publisher. Use the AllowPeer option to specify the function.

sub, err := legs.NewSubscriber(dstHost, dstStore, dstLnkS, "/legs/topic", nil, legs.AllowPeer(allowPeer))

The Subscriber keeps track of the latest head for each publisher that it has synced. This avoids exchanging the whole DAG from scratch in every update and instead downloads only the part that has not been synced. This value is not persisted as part of the library. If you want to start a Subscriber which has already partially synced with a provider you can use the SetLatestSync method:

sub, err := legs.NewSubscriber(dstHost, dstStore, dstLnkS, "/legs/topic", nil)
if err != nil {
    panic(err)
}
// Set up partially synced publishers
if err = sub.SetLatestSync(peerID1, lastSync1) ; err != nil {
    panic(err)
}
if err = sub.SetLatestSync(peerID2, lastSync2) ; err != nil {
    panic(err)
}
if err = sub.SetLatestSync(peerID3, lastSync3) ; err != nil {
    panic(err)
}

License

Legs is dual-licensed under Apache 2.0 and MIT terms:

Apache License, Version 2.0, (LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)

Documentation ¶

Index ¶

Examples ¶

Constants ¶

This section is empty.

Variables ¶

This section is empty.

Functions ¶

func ExploreRecursiveWithStop ¶

func ExploreRecursiveWithStop(limit selector.RecursionLimit, sequence selectorbuilder.SelectorSpec, stopLnk ipld.Link) ipld.Node

ExploreRecursiveWithStop builds a selector that recursively syncs a DAG until the link stopLnk is seen. It prevents from having to sync DAGs from scratch with every update.

func ExploreRecursiveWithStopNode ¶

func ExploreRecursiveWithStopNode(limit selector.RecursionLimit, sequence ipld.Node, stopLnk ipld.Link) ipld.Node

ExploreRecursiveWithStopNode builds a selector that recursively syncs a DAG until the link stopLnk is seen. It prevents from having to sync DAGs from scratch with every update.

func LegSelector ¶

func LegSelector(limit selector.RecursionLimit, stopLnk ipld.Link) ipld.Node

LegSelector is a convenient function that returns the selector used by leg subscribers

LegSelector is a "recurse all" selector that provides conditions to stop the traversal at a specific link (stopAt).

Types ¶

type AllowPeerFunc ¶ added in v0.2.0

type AllowPeerFunc func(peer.ID) (bool, error)

AllowPeerFunc is the signature of a function given to Subscriber that determines whether to allow or reject messages originating from a peer passed into the function. Returning true or false indicates that messages from that peer are allowed rejected, respectively. Returning an error indicates that there was a problem evaluating the function, and results in the messages being rejected.

type BlockHookFunc ¶ added in v0.2.0

type BlockHookFunc func(peer.ID, cid.Cid)

BlockHookFunc is the signature of a function that is called when a received.

func WrapBlockHookWithSyncedCidTracker ¶ added in v0.3.3

func WrapBlockHookWithSyncedCidTracker(cidsSeenSoFar *[]cid.Cid, blockHook BlockHookFunc) BlockHookFunc

type DefaultLatestSyncHandler ¶ added in v0.3.9

type DefaultLatestSyncHandler struct {
	// contains filtered or unexported fields
}

func (*DefaultLatestSyncHandler) GetLatestSync ¶ added in v0.3.9

func (h *DefaultLatestSyncHandler) GetLatestSync(p peer.ID) (cid.Cid, bool)

func (*DefaultLatestSyncHandler) SetLatestSync ¶ added in v0.3.9

func (h *DefaultLatestSyncHandler) SetLatestSync(p peer.ID, c cid.Cid)

type LatestSyncHandler ¶ added in v0.3.9

type LatestSyncHandler interface {
	SetLatestSync(peer peer.ID, cid cid.Cid)
	GetLatestSync(peer peer.ID) (cid.Cid, bool)
}

LatestSyncHandler defines how to store the latest synced cid for a given peer and how to fetch it. Legs gaurantees this will not be called concurrently for the same peer, but it may be called concurrently for different peers.

type Option ¶ added in v0.2.0

type Option func(*config) error

func AddrTTL ¶ added in v0.2.0

func AddrTTL(addrTTL time.Duration) Option

AddrTTL sets the peerstore address time-to-live for addresses discovered from pubsub messages.

func AllowPeer ¶ added in v0.2.0

func AllowPeer(allowPeer AllowPeerFunc) Option

AllowPeer sets the function that determines whether to allow or reject messages from a peer.

func BlockHook ¶ added in v0.2.0

func BlockHook(blockHook BlockHookFunc) Option

BlockHook adds a hook that runs when a block is received.

func DtManager ¶ added in v0.2.0

func DtManager(dtManager dt.Manager, gs graphsync.GraphExchange) Option

DtManager provides an existing datatransfer manager.

func HttpClient ¶ added in v0.2.0

func HttpClient(client *http.Client) Option

HttpClient provides Subscriber with an existing http client.

func SyncRecursionLimit ¶ added in v0.2.4

func SyncRecursionLimit(limit selector.RecursionLimit) Option

SyncRecursionLimit sets the recursion limit of the background syncing process. Defaults to selector.RecursionLimitNone if not specified.

func Topic ¶ added in v0.2.0

func Topic(topic *pubsub.Topic) Option

Topic provides an existing pubsub topic.

func UseLatestSyncHandler ¶ added in v0.3.9

func UseLatestSyncHandler(h LatestSyncHandler) Option

UseLatestSyncHandler sets the latest sync handler to use.

type Publisher ¶ added in v0.2.0

type Publisher interface {
	// SetRoot sets the root CID without publishing it.
	SetRoot(context.Context, cid.Cid) error
	// UpdateRoot sets the root CID and publishes its update in the pubsub channel.
	UpdateRoot(context.Context, cid.Cid) error
	// Publishes an update for the DAG in the pubsub channel using custom multiaddrs.
	UpdateRootWithAddrs(context.Context, cid.Cid, []ma.Multiaddr) error
	// Close publisher
	Close() error
}

Publisher is an interface for updating the published dag.

Example ¶
// Init legs publisher and subscriber
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost, _ = libp2p.New()
srcLnkS := makeLinkSystem(srcStore)

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
if err != nil {
	panic(err)
}
defer pub.Close()

// Update root on publisher one with item
itm1 := basicnode.NewString("hello world")
lnk1, err := store(srcStore, itm1)
if err != nil {
	panic(err)
}
if err = pub.UpdateRoot(context.Background(), lnk1.(cidlink.Link).Cid); err != nil {
	panic(err)
}
log.Print("Publish 1:", lnk1.(cidlink.Link).Cid)

// Update root on publisher one with item
itm2 := basicnode.NewString("hello world 2")
lnk2, err := store(srcStore, itm2)
if err != nil {
	panic(err)
}
if err = pub.UpdateRoot(context.Background(), lnk2.(cidlink.Link).Cid); err != nil {
	panic(err)
}
log.Print("Publish 2:", lnk2.(cidlink.Link).Cid)
Output:

type Subscriber ¶ added in v0.2.0

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber creates a single pubsub subscriber that receives messages from a gossip pubsub topic, and creates a stateful message handler for each message source peer. An optional externally-defined AllowPeerFunc determines whether to allow or deny messages from specific peers.

Messages from separate peers are handled concurrently, and multiple messages from the same peer are handled serially. If a handler is busy handling a message, and more messages arrive from the same peer, then the last message replaces the previous unhandled message to avoid having to maintain queues of messages. Handlers do not have persistent goroutines, but start a new goroutine to handle a single message.

Example ¶
dstHost, _ := libp2p.New()

dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstLnkSys := makeLinkSystem(dstStore)

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)

sub, err := legs.NewSubscriber(dstHost, dstStore, dstLnkSys, "/indexer/ingest/testnet", nil)
if err != nil {
	panic(err)
}
defer sub.Close()

// Connections must be made after Subscriber is created, because the
// gossip pubsub must be created before connections are made.  Otherwise,
// the connecting hosts will not see the destination host has pubsub and
// messages will not get published.
dstPeerInfo := dstHost.Peerstore().PeerInfo(dstHost.ID())
if err = srcHost.Connect(context.Background(), dstPeerInfo); err != nil {
	panic(err)
}

watcher, cancelWatcher := sub.OnSyncFinished()
defer cancelWatcher()

for syncFin := range watcher {
	fmt.Println("Finished sync to", syncFin.Cid, "with peer:", syncFin.PeerID)
}
Output:

func NewSubscriber ¶

func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, topic string, dss ipld.Node, options ...Option) (*Subscriber, error)

NewSubscriber creates a new Subscriber that process pubsub messages.

func (*Subscriber) Close ¶ added in v0.2.0

func (s *Subscriber) Close() error

Close shuts down the Subscriber.

func (*Subscriber) GetLatestSync ¶ added in v0.2.0

func (s *Subscriber) GetLatestSync(peerID peer.ID) ipld.Link

GetLatestSync returns the latest synced CID for the specified peer. If there is not handler for the peer, then nil is returned. This does not mean that no data is synced with that peer, it means that the Subscriber does not know about it. Calling Sync() first may be necessary.

func (*Subscriber) OnSyncFinished ¶ added in v0.2.0

func (s *Subscriber) OnSyncFinished() (<-chan SyncFinished, context.CancelFunc)

OnSyncFinished creates a channel that receives change notifications, and adds that channel to the list of notification channels.

Calling the returned cancel function removes the notification channel from the list of channels to be notified on changes, and it closes the channel to allow any reading goroutines to stop waiting on the channel.

func (*Subscriber) SetAllowPeer ¶ added in v0.2.0

func (s *Subscriber) SetAllowPeer(allowPeer AllowPeerFunc)

SetAllowPeer configures Subscriber with a function to evaluate whether to allow or reject messages from a peer. Setting nil removes any filtering and allows messages from all peers. Calling SetAllowPeer replaces any previously configured AllowPeerFunc.

func (*Subscriber) SetLatestSync ¶ added in v0.2.0

func (s *Subscriber) SetLatestSync(peerID peer.ID, latestSync cid.Cid) error

SetLatestSync sets the latest synced CID for a specified peer. If there is no handler for the peer, then one is created without consulting any AllowPeerFunc.

func (*Subscriber) Sync ¶ added in v0.2.0

func (s *Subscriber) Sync(ctx context.Context, peerID peer.ID, nextCid cid.Cid, sel ipld.Node, peerAddr multiaddr.Multiaddr, opts ...SyncOption) (cid.Cid, error)

Sync performs a one-off explicit sync with the given peer for a specific CID and updates the latest synced link to it. Completing sync may take a significant amount of time, so Sync should generally be run in its own goroutine.

If given cid.Undef, the latest root CID is queried from the peer directly and used instead. Note that in an event where there is no latest root, i.e. querying the latest CID returns cid.Undef, this function returns cid.Undef with nil error.

The latest synced CID is returned when this sync is complete. Any OnSyncFinished readers will also get a SyncFinished when the sync succeeds, but only if syncing to the latest, using `cid.Undef`, and using the default selector. This is because when specifying a CID, it is usually for an entries sync, not an advertisements sync.

It is the responsibility of the caller to make sure the given CID appears after the latest sync in order to avid re-syncing of content that may have previously been synced.

The selector sequence, sel, can optionally be specified to customize the selection sequence during traversal. If unspecified, the default selector sequence is used.

Note that the selector sequence is wrapped with a selector logic that will stop traversal when the latest synced link is reached. Therefore, it must only specify the selection sequence itself.

See: ExploreRecursiveWithStopNode.

type SyncFinished ¶ added in v0.2.0

type SyncFinished struct {
	// Cid is the CID identifying the link that finished and is now the latest
	// sync for a specific peer.
	Cid cid.Cid
	// PeerID identifies the peer this SyncFinished event pertains to.
	PeerID peer.ID
	// A list of cids that this sync acquired. In order from latest to oldest. The latest cid will always be at the beginning.
	SyncedCids []cid.Cid
}

SyncFinished notifies an OnSyncFinished reader that a specified peer completed a sync. The channel receives events from providers that are manually synced to the latest, as well as those auto-discovered.

type SyncOption ¶ added in v0.3.4

type SyncOption func(*syncCfg)

func AlwaysUpdateLatest ¶ added in v0.3.4

func AlwaysUpdateLatest() SyncOption

func ScopedBlockHook ¶ added in v0.3.4

func ScopedBlockHook(hook BlockHookFunc) SyncOption

type Syncer ¶ added in v0.2.0

type Syncer interface {
	GetHead(context.Context) (cid.Cid, error)
	Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
}

Syncer is the interface used to sync with a data source.

Directories ¶

Path Synopsis
gen
p2p

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL