Documentation ¶
Index ¶
- func DagsyncSelector(limit selector.RecursionLimit, stopLnk ipld.Link) ipld.Node
- func ExploreRecursiveWithStop(limit selector.RecursionLimit, sequence selectorbuilder.SelectorSpec, ...) ipld.Node
- func ExploreRecursiveWithStopNode(limit selector.RecursionLimit, sequence ipld.Node, stopLnk ipld.Link) ipld.Node
- type BlockHookFunc
- type LastKnownSyncFunc
- type Option
- func AddrTTL(addrTTL time.Duration) Option
- func AdsDepthLimit(limit int64) Option
- func BlockHook(blockHook BlockHookFunc) Option
- func EntriesDepthLimit(depth int64) Option
- func FirstSyncDepth(depth int64) Option
- func HttpTimeout(to time.Duration) Option
- func IdleHandlerTTL(ttl time.Duration) Option
- func MaxAsyncConcurrency(n int) Option
- func RecvAnnounce(topic string, opts ...announce.Option) Option
- func RetryableHTTPClient(retryMax int, waitMin, waitMax time.Duration) Option
- func SegmentDepthLimit(depth int64) Option
- func StrictAdsSelector(strict bool) Option
- func Topic(topic *pubsub.Topic) Option
- func WithLastKnownSync(f LastKnownSyncFunc) Option
- type Publisher
- type SegmentBlockHookFunc
- type SegmentSyncActions
- type Subscriber
- func (s *Subscriber) Announce(ctx context.Context, nextCid cid.Cid, peerInfo peer.AddrInfo) error
- func (s *Subscriber) Close() error
- func (s *Subscriber) GetLatestSync(peerID peer.ID) ipld.Link
- func (s *Subscriber) HttpPeerStore() peerstore.Peerstore
- func (s *Subscriber) OnSyncFinished() (<-chan SyncFinished, context.CancelFunc)
- func (s *Subscriber) RemoveHandler(peerID peer.ID) bool
- func (s *Subscriber) SetLatestSync(peerID peer.ID, latestSync cid.Cid) error
- func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, options ...SyncOption) (cid.Cid, error)
- func (s *Subscriber) SyncEntries(ctx context.Context, peerInfo peer.AddrInfo, entCid cid.Cid, ...) error
- func (s *Subscriber) SyncHAMTEntries(ctx context.Context, peerInfo peer.AddrInfo, entCid cid.Cid, ...) error
- func (s *Subscriber) SyncOneEntry(ctx context.Context, peerInfo peer.AddrInfo, entCid cid.Cid) error
- type SyncFinished
- type SyncOption
- type Syncer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DagsyncSelector ¶
func DagsyncSelector(limit selector.RecursionLimit, stopLnk ipld.Link) ipld.Node
DagsyncSelector is a convenient function that returns the selector used by dagsync subscribers
DagsyncSelector is a "recurse all" selector that provides conditions to stop the traversal at a specific link (stopAt).
func ExploreRecursiveWithStop ¶
func ExploreRecursiveWithStop(limit selector.RecursionLimit, sequence selectorbuilder.SelectorSpec, stopLnk ipld.Link) ipld.Node
ExploreRecursiveWithStop builds a selector that recursively syncs a DAG until the link stopLnk is seen. It prevents from having to sync DAGs from scratch with every update.
func ExploreRecursiveWithStopNode ¶
func ExploreRecursiveWithStopNode(limit selector.RecursionLimit, sequence ipld.Node, stopLnk ipld.Link) ipld.Node
ExploreRecursiveWithStopNode builds a selector that recursively syncs a DAG until the link stopLnk is seen. It prevents from having to sync DAGs from scratch with every update.
Types ¶
type BlockHookFunc ¶
type BlockHookFunc func(peer.ID, cid.Cid, SegmentSyncActions)
BlockHookFunc is the signature of a function that is called when a received.
func MakeGeneralBlockHook ¶ added in v0.5.9
func MakeGeneralBlockHook(prevAdCid func(adCid cid.Cid) (cid.Cid, error)) BlockHookFunc
MakeGeneralBlockHook creates a block hook function that sets the next sync action based on whether the specified advertisement has a previous advertisement in the chain..
Use this when segmented sync is enabled and no other blockhook is defined.
The supplied prevAdCid function takes the CID of the current advertisement and returns the CID of the previous advertisement in the chain. This would typically be done my loading the specified advertisement from the ipld.LinkSystem and getting the previous CID.
type LastKnownSyncFunc ¶ added in v0.2.4
type Option ¶
type Option func(*config) error
Option is a function that sets a value in a config.
func AddrTTL ¶
AddrTTL sets the peerstore address time-to-live for addresses discovered from pubsub messages.
func AdsDepthLimit ¶ added in v0.4.0
AdsDepthLimit sets the maximum number of advertisements in a chain to sync. Defaults to unlimited if not specified or set < 1.
func BlockHook ¶
func BlockHook(blockHook BlockHookFunc) Option
BlockHook adds a hook that is run when a block is received via Subscriber.Sync along with a SegmentSyncActions to control the sync flow if segmented sync is enabled. Note that if segmented sync is disabled, calls on SegmentSyncActions will have no effect. See: SegmentSyncActions, SegmentDepthLimit, ScopedBlockHook.
func EntriesDepthLimit ¶ added in v0.4.0
EntriesDepthLimit sets the maximum number of multihash entries blocks to sync per advertisement. Defaults to unlimited if not set or set to < 1.
func FirstSyncDepth ¶ added in v0.5.11
FirstSyncDepth sets the advertisement chain depth to sync on the first sync with a new provider. A value of 0, the default, means unlimited depth.
func HttpTimeout ¶ added in v0.5.0
HttpTimeout specifies a time limit for HTTP requests made by the sync HTTP client. A value of zero means no timeout.
func IdleHandlerTTL ¶
IdleHandlerTTL configures the time after which idle handlers are removed.
func MaxAsyncConcurrency ¶ added in v0.4.0
MaxAsyncConcurrency sets the maximum number of concurrent asynchrouous syncs (started by announce messages). This only takes effect if there is an announcement receiver configured by the RecvAnnounce option.
func RecvAnnounce ¶ added in v0.3.0
RecvAnnounce enables an announcement message receiver.
func RetryableHTTPClient ¶ added in v0.5.0
RetryableHTTPClient configures a retriable HTTP client. Setting retryMax to zero, the default, disables the retriable client.
func SegmentDepthLimit ¶
SegmentDepthLimit sets the maximum recursion depth limit for a segmented sync. Setting the depth to a value less than zero disables segmented sync completely. Disabled by default.
For segmented sync to function at least one of BlockHook or ScopedBlockHook must be set.
func StrictAdsSelector ¶ added in v0.4.0
Checks that advertisement blocks contain a "PreviousID" field. This can be set to false to not do the check if there is no reason to do so.
func WithLastKnownSync ¶ added in v0.2.4
func WithLastKnownSync(f LastKnownSyncFunc) Option
WithLastKnownSync sets a function that returns the last known sync, when it is not already known to dagsync. This will generally be some CID that is known to have already been seen, so that there is no need to fetch portions of the dag before this.
type Publisher ¶
type Publisher interface { // Addrs returns the addresses that the publisher is listening on. Addrs() []multiaddr.Multiaddr // ID returns the peer ID associated with the publisher. ID() peer.ID // Protocol returns multiaddr protocol code (P_P2P or P_HTTP). Protocol() int // SetRoot sets the root CID without publishing it. SetRoot(cid.Cid) // Close publisher. Close() error }
Publisher is an interface for updating the published dag.
Example ¶
// Init dagsync publisher and subscriber. srcPrivKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) srcHost, _ = libp2p.New(libp2p.Identity(srcPrivKey)) defer srcHost.Close() srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) srcLnkS := makeLinkSystem(srcStore) pub, err := ipnisync.NewPublisher(srcLnkS, srcPrivKey, ipnisync.WithStreamHost(srcHost), ipnisync.WithHeadTopic("/indexer/ingest/testnet")) if err != nil { panic(err) } defer pub.Close() // Update root on publisher one with item itm1 := basicnode.NewString("hello world") lnk1, err := store(srcStore, itm1) if err != nil { panic(err) } pub.SetRoot(lnk1.(cidlink.Link).Cid) log.Print("Publish 1:", lnk1.(cidlink.Link).Cid) // Update root on publisher one with item itm2 := basicnode.NewString("hello world 2") lnk2, err := store(srcStore, itm2) if err != nil { panic(err) } pub.SetRoot(lnk2.(cidlink.Link).Cid) log.Print("Publish 2:", lnk2.(cidlink.Link).Cid)
Output:
type SegmentBlockHookFunc ¶
type SegmentBlockHookFunc func(peer.ID, cid.Cid, SegmentSyncActions)
SegmentBlockHookFunc is called for each synced block, similarly to BlockHookFunc. Except that it provides SegmentSyncActions to the hook allowing the user to control the flow of segmented sync by determining which CID should be used in the next segmented sync cycle by decoding the synced block.
SegmentSyncActions also allows the user to signal any errors that may occur during the hook execution to terminate the sync and mark it as failed.
type SegmentSyncActions ¶
type SegmentSyncActions interface { // SetNextSyncCid sets the cid that will be synced in the next // segmented sync. Note that the last call to this function during a // segmented sync cycle dictates which CID will be synced in the next // cycle. // // At least one call to this function must be made for the segmented // sync cycles to continue. Because, otherwise the CID that should be // used in the next segmented sync cycle cannot be known. // // If no calls are made to this function or next CID is set to // cid.Undef, the sync will terminate and any CIDs that are synced so // far will be included in a SyncFinished event. SetNextSyncCid(cid.Cid) // FailSync fails the sync and returns the given error as soon as the // current segment sync finishes. The last call to this function during // a segmented sync cycle dictates the error value. Passing nil as // error will cancel sync failure. FailSync(error) }
SegmentSyncActions allows the user to control the flow of segmented sync by either choosing which CID should be synced in the next sync cycle or setting the error that should mark the sync as failed.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber reads chains of advertisements from index-providers (publishers) and keeps track of the latest advertisement read from each publisher. Advertisements are read when explicitly requested, or in response to announcement messages if an announcement receiver is configured.
An announcement receiver can receive announcements that are broadcast over libp2p gossip pubsub, and sent directly over HTTP. A receiver can be given an optional externally-defined function to determines whether to allow or deny messages from specific peers.
A stateful message handler is maintained for each advertisement source peer. Messages from separate peers are handled concurrently, and multiple messages from the same peer are handled serially. If a handler is busy handling a message, and more messages arrive from the same peer, then the last message replaces the previous unhandled message to avoid having to maintain queues of messages. Handlers do not have persistent goroutines, but start a new goroutine to handle a single message.
Example ¶
dstHost, _ := libp2p.New() defer dstHost.Close() dstStore := dssync.MutexWrap(datastore.NewMapDatastore()) dstLnkSys := makeLinkSystem(dstStore) srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour) dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour) sub, err := dagsync.NewSubscriber(dstHost, dstLnkSys, dagsync.RecvAnnounce("/indexer/ingest/testnet")) if err != nil { panic(err) } defer sub.Close() // Connections are made after Subscriber is created, so that the connecting // host sees that the destination host has pubsub. dstPeerInfo := dstHost.Peerstore().PeerInfo(dstHost.ID()) if err = srcHost.Connect(context.Background(), dstPeerInfo); err != nil { panic(err) } watcher, cancelWatcher := sub.OnSyncFinished() defer cancelWatcher() for syncFin := range watcher { fmt.Println("Finished sync to", syncFin.Cid, "with peer:", syncFin.PeerID) }
Output:
func NewSubscriber ¶
func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Subscriber, error)
NewSubscriber creates a new Subscriber that processes pubsub messages and syncs dags advertised using the specified selector.
func (*Subscriber) Announce ¶
Announce handles a direct announce message, that was not received over pubsub. The message is resent over pubsub, if the Receiver is configured to do so. The peerID and addrs are those of the advertisement publisher, since an announce message announces the availability of an advertisement and where to retrieve it from.
func (*Subscriber) GetLatestSync ¶
func (s *Subscriber) GetLatestSync(peerID peer.ID) ipld.Link
GetLatestSync returns the latest synced CID for the specified peer.
func (*Subscriber) HttpPeerStore ¶
func (s *Subscriber) HttpPeerStore() peerstore.Peerstore
HttpPeerStore returns the subscriber's HTTP peer store.
func (*Subscriber) OnSyncFinished ¶
func (s *Subscriber) OnSyncFinished() (<-chan SyncFinished, context.CancelFunc)
OnSyncFinished creates a channel that receives change notifications, and adds that channel to the list of notification channels.
Calling the returned cancel function removes the notification channel from the list of channels to be notified on changes, and it closes the channel to allow any reading goroutines to stop waiting on the channel.
func (*Subscriber) RemoveHandler ¶
func (s *Subscriber) RemoveHandler(peerID peer.ID) bool
RemoveHandler removes a handler for a publisher.
func (*Subscriber) SetLatestSync ¶
func (s *Subscriber) SetLatestSync(peerID peer.ID, latestSync cid.Cid) error
SetLatestSync sets the latest synced CID for a specified peer.
func (*Subscriber) SyncAdChain ¶ added in v0.4.0
func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, options ...SyncOption) (cid.Cid, error)
SyncAdChain performs a one-off explicit sync with the given peer (publisher) for an advertisement chain, and updates the latest synced link to it.
The latest root CID is queried from the peer directly. In the event where there is no latest root, i.e. querying the latest CID returns cid.Undef, this function returns cid.Undef with nil error.
The latest synced CID is returned when this sync is complete. Any OnSyncFinished readers will also get a SyncFinished when the sync succeeds.
It is the responsibility of the caller to make sure the given CID appears after the latest sync in order to avid re-syncing of content that may have previously been synced.
func (*Subscriber) SyncEntries ¶ added in v0.4.0
func (s *Subscriber) SyncEntries(ctx context.Context, peerInfo peer.AddrInfo, entCid cid.Cid, options ...SyncOption) error
SyncEntries syncs the entries chain starting with block identified by entCid.
func (*Subscriber) SyncHAMTEntries ¶ added in v0.4.0
func (s *Subscriber) SyncHAMTEntries(ctx context.Context, peerInfo peer.AddrInfo, entCid cid.Cid, options ...SyncOption) error
func (*Subscriber) SyncOneEntry ¶ added in v0.4.0
type SyncFinished ¶
type SyncFinished struct { // Cid is the CID identifying the link that finished and is now the latest // sync for a specific peer. Cid cid.Cid // PeerID identifies the peer this SyncFinished event pertains to. This is // the publisher of the advertisement chain. PeerID peer.ID // Count is the number of CIDs synced. Count int // Err is used to return a failure to complete an asynchronous sync in // response to an announcement. Err error }
SyncFinished notifies an OnSyncFinished reader that a specified peer completed a sync. The channel receives events from providers that are manually synced to the latest, as well as those auto-discovered.
type SyncOption ¶
type SyncOption func(*syncCfg)
func ScopedBlockHook ¶
func ScopedBlockHook(hook BlockHookFunc) SyncOption
ScopedBlockHook is the equivalent of BlockHook option but only applied to a single sync. If not specified, the Subscriber BlockHook option is used instead. Specifying the ScopedBlockHook will override the Subscriber level BlockHook for the current sync.
Calls to SegmentSyncActions from bloc hook will have no impact if segmented sync is disabled. See: BlockHook, SegmentDepthLimit, ScopedSegmentDepthLimit.
func ScopedDepthLimit ¶ added in v0.4.0
func ScopedDepthLimit(limit int64) SyncOption
ScopedDepthLimit provides a sync depth limit for the current sync. This applies to both advertisement and entries chains. If zero or not specified, the Subscriber ads or entries depth limit is used. Set to -1 for no limits.
func ScopedSegmentDepthLimit ¶
func ScopedSegmentDepthLimit(depth int64) SyncOption
ScopedSegmentDepthLimit is the equivalent of SegmentDepthLimit option but only applied to a single sync. If zero or not specified, the Subscriber SegmentDepthLimit option is used instead. Set to -1 for no limits.
For segmented sync to function at least one of BlockHook or ScopedBlockHook must be set. See: SegmentDepthLimit.
func WithAdsResync ¶ added in v0.4.0
func WithAdsResync(resync bool) SyncOption
WithResyncAds causes the current sync to ignore advertisements that have been previously synced. When true, sync does not record the latest synced CID or send sync finished notification.
func WithHeadAdCid ¶ added in v0.4.0
func WithHeadAdCid(headAd cid.Cid) SyncOption
WithHeadAdCid explicitly specifies an advertisement CID to sync to, instead of getting this by querying the publisher.
func WithStopAdCid ¶ added in v0.4.0
func WithStopAdCid(stopAd cid.Cid) SyncOption
WithStopAdCid explicitly specifies an advertisement CID to stop at, instead of using the latest synced advertisement CID..