Documentation ¶
Overview ¶
Package dsync implements point-to-point merkle-DAG-syncing between a local instance and remote source. It's like rsync, but specific to merkle-DAGs. dsync operates over HTTP and (soon) libp2p connections.
dsync by default can push & fetch DAGs to another dsync instance, called the "remote". Dsync instances that want to accept merkle-DAGs must opt into operating as a remote by configuring a dsync.Dsync instance to do so
Dsync is structured as bring-your-own DAG vetting. All push requests are run through two "check" functions called at the beginning and and of the push process. Each check function supplies details about the push being requested or completed. The default intial check function rejects all requests, and must be overridden to accept data
Index ¶
- Constants
- Variables
- func AddAllFromCARReader(ctx context.Context, bapi coreiface.BlockAPI, r io.Reader, progCh chan cid.Cid) (int, error)
- func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc
- func NewLocalNodeGetter(api coreiface.CoreAPI) (ipld.NodeGetter, error)
- func NewManifestCARReader(ctx context.Context, ng ipld.NodeGetter, mfst *dag.Manifest, ...) (io.Reader, error)
- func OptLibp2pHost(host host.Host) func(cfg *Config)
- type Config
- type DagStreamable
- type DagSyncable
- type Dsync
- func (ds *Dsync) GetBlock(ctx context.Context, hash string) ([]byte, error)
- func (ds *Dsync) GetDagInfo(ctx context.Context, hash string, meta map[string]string) (info *dag.Info, err error)
- func (ds *Dsync) NewPull(cidStr, remoteAddr string, meta map[string]string) (*Pull, error)
- func (ds *Dsync) NewPush(cidStr, remoteAddr string, pinOnComplete bool) (*Push, error)
- func (ds *Dsync) NewPushInfo(info *dag.Info, remoteAddr string, pinOnComplete bool) (*Push, error)
- func (ds *Dsync) NewReceiveSession(info *dag.Info, pinOnComplete bool, meta map[string]string) (sid string, diff *dag.Manifest, err error)
- func (ds *Dsync) OpenBlockStream(ctx context.Context, info *dag.Info, meta map[string]string) (io.ReadCloser, error)
- func (ds *Dsync) ProtocolVersion() (protocol.ID, error)
- func (ds *Dsync) ReceiveBlock(sid, hash string, data []byte) ReceiveResponse
- func (ds *Dsync) ReceiveBlocks(ctx context.Context, sid string, r io.Reader) error
- func (ds *Dsync) RemoveCID(ctx context.Context, cidStr string, meta map[string]string) error
- func (ds *Dsync) StartRemote(ctx context.Context) error
- type HTTPClient
- func (rem *HTTPClient) GetBlock(ctx context.Context, id string) (data []byte, err error)
- func (rem *HTTPClient) GetDagInfo(ctx context.Context, id string, meta map[string]string) (info *dag.Info, err error)
- func (rem *HTTPClient) NewReceiveSession(info *dag.Info, pinOnComplete bool, meta map[string]string) (sid string, diff *dag.Manifest, err error)
- func (rem *HTTPClient) OpenBlockStream(ctx context.Context, info *dag.Info, meta map[string]string) (io.ReadCloser, error)
- func (rem *HTTPClient) ProtocolVersion() (protocol.ID, error)
- func (rem *HTTPClient) ReceiveBlock(sid, hash string, data []byte) ReceiveResponse
- func (rem *HTTPClient) ReceiveBlocks(ctx context.Context, sid string, r io.Reader) error
- func (rem *HTTPClient) RemoveCID(ctx context.Context, id string, meta map[string]string) (err error)
- type Hook
- type Pull
- type Push
- type ReceiveResponse
- type ReceiveResponseStatus
Examples ¶
Constants ¶
const ( // DsyncProtocolID is the dsyc p2p Protocol Identifier & version tag DsyncProtocolID = protocol.ID("/dsync/0.2.0") )
Variables ¶
var ( // ErrRemoveNotSupported is the error value returned by remotes that don't // support delete operations ErrRemoveNotSupported = fmt.Errorf("remove is not supported") // ErrUnknownProtocolVersion is the error for when the version of the remote // protocol is unknown, usually because the handshake with the the remote // hasn't happened yet ErrUnknownProtocolVersion = fmt.Errorf("unknown protocol version") )
var DefaultDagFinalCheck = func(context.Context, dag.Info, map[string]string) error { return nil }
DefaultDagFinalCheck by default performs no check
var DefaultDagPrecheck = func(context.Context, dag.Info, map[string]string) error { return fmt.Errorf("remote is not configured to accept DAGs") }
DefaultDagPrecheck rejects all requests Dsync users are required to override this hook to make dsync work, and are expected to supply a trust model in this hook. An example trust model is a peerID and contentID accept/reject list supplied by the application
Precheck could also reject based on size limitations by examining data provided by the requester, but be advised the Info provided is gossip at this point in the sync process.
If the Precheck hook returns an error the remote will deny the request to push any blocks, no session will be created and the error message will be returned in the response status.
Functions ¶
func AddAllFromCARReader ¶ added in v0.2.2
func AddAllFromCARReader(ctx context.Context, bapi coreiface.BlockAPI, r io.Reader, progCh chan cid.Cid) (int, error)
AddAllFromCARReader consumers a CAR reader stream, placing all blocks in the given blockstore
func HTTPRemoteHandler ¶ added in v0.2.0
func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc
HTTPRemoteHandler exposes a Dsync remote over HTTP by exposing a HTTP handler that interlocks with methods exposed by HTTPClient
func NewLocalNodeGetter ¶ added in v0.2.0
func NewLocalNodeGetter(api coreiface.CoreAPI) (ipld.NodeGetter, error)
NewLocalNodeGetter creates a local NodeGetter from a ipfs CoreAPI instance "local" NodeGetters don't fetch over the dweb.
it's important to pass Dsync a NodeGetter instance that doesn't perform any network operations when trying to resolve blocks. If we don't do this dsync will ask ipfs for blocks it doesn't have, and ipfs will try to *fetch* this blocks, which kinda defeats the point of syncing blocks by other means
func NewManifestCARReader ¶ added in v0.2.2
func NewManifestCARReader(ctx context.Context, ng ipld.NodeGetter, mfst *dag.Manifest, progCh chan cid.Cid) (io.Reader, error)
NewManifestCARReader creates a Content-addressed ARchive on the fly from a manifest and a node getter. It fetches blocks in order from the list of cids in the manifest and writes them to a buffer as the reader is consumed The roots specified in the archive header match the manifest RootCID method If an incomplete manifest graph is passed to NewManifestCARReader, the resulting archive will not be a complete graph. This is permitted by the spec, and used by dsync to create an archive of only-missing-blocks for more on CAR files, see: https://github.com/ipld/specs/blob/master/block-layer/content-addressable-archives.md If supplied a non-nil channel progress channel, the stream will send as each CID is buffered to the read stream
func OptLibp2pHost ¶ added in v0.2.0
OptLibp2pHost is a convenience function for supplying a libp2p.Host to dsync.New
Types ¶
type Config ¶ added in v0.2.0
type Config struct { // InfoStore is an optional caching layer for dag.Info objects InfoStore dag.InfoStore // provide a listening addres to have Dsync spin up an HTTP server when // StartRemote(ctx) is called HTTPRemoteAddress string // to send & push over libp2p connections, provide a libp2p host Libp2pHost host.Host // PinAPI is required for remotes to accept pinning requests PinAPI coreiface.PinAPI // RequireAllBlocks will skip checking for blocks already present on the // remote, requiring push requests to send all blocks each time // This is a helpful override if the receiving node can't distinguish between // local and network block access, as with the ipfs-http-api intreface RequireAllBlocks bool // AllowRemoves let's dsync opt into remove requests. removes are // disabled by default AllowRemoves bool // required check function for a remote accepting DAGs, this hook will be // called before a push is allowed to begin PushPreCheck Hook // optional check function for screening a receive before potentially pinning PushFinalCheck Hook // optional check function called after successful transfer PushComplete Hook // optional check to run on dagInfo requests before sending an info back GetDagInfoCheck Hook // optional hook to run before allowing a stream of blocks OpenBlockStreamCheck Hook // optional check to run before executing a remove operation // the dag.Info given to this check will only contain the root CID being // removed RemoveCheck Hook }
Config encapsulates optional Dsync configuration
type DagStreamable ¶ added in v0.2.2
type DagStreamable interface { // ReceiveBlocks asks a remote to accept a stream of blocks from a local // client, this can only happen within a push session ReceiveBlocks(ctx context.Context, sessionID string, r io.Reader) error // OpenBlockStream asks a remote to generate a block stream OpenBlockStream(ctx context.Context, info *dag.Info, meta map[string]string) (io.ReadCloser, error) }
DagStreamable is an interface for sending and fetching all blocks in a given manifest in one trip
type DagSyncable ¶ added in v0.2.0
type DagSyncable interface { // NewReceiveSession starts a push session from local to a remote. // The remote will return a delta manifest of blocks the remote needs // and a session id that must be sent with each block NewReceiveSession(info *dag.Info, pinOnComplete bool, meta map[string]string) (sid string, diff *dag.Manifest, err error) // ProtocolVersion indicates the version of dsync the remote speaks, only // available after a handshake is established. Calling this method before a // handshake must return ErrUnknownProtocolVersion ProtocolVersion() (protocol.ID, error) // ReceiveBlock places a block on the remote ReceiveBlock(sid, hash string, data []byte) ReceiveResponse // GetDagInfo asks the remote for info specified by a the root identifier // string of a DAG GetDagInfo(ctx context.Context, cidStr string, meta map[string]string) (info *dag.Info, err error) // GetBlock gets a block of data from the remote GetBlock(ctx context.Context, hash string) (rawdata []byte, err error) // RemoveCID asks the remote to remove a cid. Supporting deletes are optional. // DagSyncables that don't implement DeleteCID must return // ErrDeleteNotSupported RemoveCID(ctx context.Context, cidStr string, meta map[string]string) (err error) }
DagSyncable is a source that can be synced to & from. dsync requests automate calls to this interface with higher-order functions like Push and Pull
In order to coordinate between a local and a remote, you need something that will satisfy the DagSyncable interface on both ends of the wire, one to act as the requester and the other to act as the remote
type Dsync ¶ added in v0.2.0
type Dsync struct {
// contains filtered or unexported fields
}
Dsync is a service for synchronizing a DAG of blocks between a local & remote source
func New ¶ added in v0.2.0
func New(localNodes ipld.NodeGetter, blockStore coreiface.BlockAPI, opts ...func(cfg *Config)) (*Dsync, error)
New creates a local Dsync service. By default Dsync can push and pull to remotes. It can be configured to act as a remote for other Dsync instances.
Its crucial that the NodeGetter passed to New be an offline-only getter. if using IPFS, this package defines a helper function: NewLocalNodeGetter to get an offline-only node getter from an ipfs CoreAPI interface
Example ¶
package main import ( "context" "fmt" "io/ioutil" "strings" "testing" "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" coreiface "github.com/ipfs/interface-go-ipfs-core" "github.com/ipfs/interface-go-ipfs-core/path" "github.com/qri-io/dag" ) func main() { // first some boilerplate setup. In this example we're using "full" IPFS nodes // but all that's required is a blockstore and dag core api implementation // in this example we're going to use a single context, which doesn't make // much sense in production. At a minimum the contexts for nodeA & nodeB // would be separate ctx, done := context.WithCancel(context.Background()) defer done() // nodeA is an ipfs instance that will create a DAG // nodeB is another ipfs instance nodeA will push that DAG to nodeA, nodeB := mustNewLocalRemoteIPFSAPI(ctx) // Local Setup: // add a single block graph to nodeA getting back a content identifier cid := mustAddOneBlockDAG(nodeA) // make a localNodeGetter, when performing dsync we don't want to fetch // blocks from the dweb aLocalDS, err := NewLocalNodeGetter(nodeA) if err != nil { panic(err) // don't panic. real programs handle errors. } // create nodeA's Dsync instance aDsync, err := New(aLocalDS, nodeA.Block()) if err != nil { panic(err) } // Remote setup: // setup the remote we're going to push to, starting by creating a local node // getter bng, err := NewLocalNodeGetter(nodeB) if err != nil { panic(err) } // we're going set up our remote to push over HTTP bAddr := ":9595" // create the remote instance, configuring it to accept DAGs bDsync, err := New(bng, nodeB.Block(), func(cfg *Config) { // configure the remote listening address: cfg.HTTPRemoteAddress = bAddr // we MUST override the PreCheck function. In this example we're making sure // no one sends us a bad hash: cfg.PushPreCheck = func(ctx context.Context, info dag.Info, _ map[string]string) error { if info.Manifest.Nodes[0] == "BadHash" { return fmt.Errorf("rejected for secret reasons") } return nil } // in order for remotes to allow pinning, they must be provided a PinAPI: cfg.PinAPI = nodeB.Pin() }) if err != nil { panic(err) } // start listening for remote pushes & pulls. This should be long running, // like a server. Cancel the provided context to close if err = bDsync.StartRemote(ctx); err != nil { panic(err) } // Create a Push: push, err := aDsync.NewPush(cid.String(), fmt.Sprintf("http://localhost%s/dsync", bAddr), true) if err != nil { panic(err) } // We want to see progress, so we spin up a goroutine to listen for updates waitForFmt := make(chan struct{}) go func() { updates := push.Updates() for { select { case update := <-updates: fmt.Printf("%d/%d blocks transferred\n", update.CompletedBlocks(), len(update)) if update.Complete() { fmt.Println("done!") waitForFmt <- struct{}{} } case <-ctx.Done(): // don't leak goroutines waitForFmt <- struct{}{} return } } }() // Do the push if err := push.Do(ctx); err != nil { panic(err) } // at this point we know the update is finished // prove the block is now in nodeB: _, err = nodeB.Block().Get(ctx, path.New(cid.String())) if err != nil { panic(err) } // block until updates has had a chance to print <-waitForFmt } func mustNewLocalRemoteIPFSAPI(ctx context.Context) (local, remote coreiface.CoreAPI) { _, a, err := makeAPI(ctx) if err != nil { panic(err) } _, b, err := makeAPI(ctx) if err != nil { panic(err) } return a, b } func newLocalRemoteIPFSAPI(ctx context.Context, t *testing.T) (local, remote coreiface.CoreAPI) { _, a, err := makeAPI(ctx) if err != nil { t.Fatal(err) } _, b, err := makeAPI(ctx) if err != nil { t.Fatal(err) } return a, b } func mustAddOneBlockDAG(node coreiface.CoreAPI) cid.Cid { ctx := context.Background() f := files.NewReaderFile(ioutil.NopCloser(strings.NewReader("y" + strings.Repeat("o", 350)))) path, err := node.Unixfs().Add(ctx, f) if err != nil { panic(err) } return path.Cid() } func addOneBlockDAG(node coreiface.CoreAPI, t *testing.T) cid.Cid { ctx := context.Background() f := files.NewReaderFile(ioutil.NopCloser(strings.NewReader("y" + strings.Repeat("o", 350)))) path, err := node.Unixfs().Add(ctx, f) if err != nil { t.Fatal(err) } return path.Cid() }
Output: 0/1 blocks transferred 1/1 blocks transferred done!
func NewTestDsync ¶ added in v0.2.0
func NewTestDsync() *Dsync
NewTestDsync returns a Dsync pointer suitable for testing
func (*Dsync) GetDagInfo ¶ added in v0.2.0
func (ds *Dsync) GetDagInfo(ctx context.Context, hash string, meta map[string]string) (info *dag.Info, err error)
GetDagInfo gets the manifest for a DAG rooted at id, checking any configured cache before falling back to generating a new manifest
func (*Dsync) NewPull ¶ added in v0.2.0
NewPull creates a pull. A pull fetches an entire DAG from a remote, placing it in the local block store
func (*Dsync) NewPushInfo ¶ added in v0.2.0
NewPushInfo creates a push from an existing dag.Info. All blocks in the info manifest must be accessible from the local Dsync block repository
func (*Dsync) NewReceiveSession ¶ added in v0.2.0
func (ds *Dsync) NewReceiveSession(info *dag.Info, pinOnComplete bool, meta map[string]string) (sid string, diff *dag.Manifest, err error)
NewReceiveSession takes a manifest sent by a remote and initiates a transfer session. It returns a manifest/diff of the blocks the reciever needs to have a complete DAG new sessions are created with a deadline for completion
func (*Dsync) OpenBlockStream ¶ added in v0.2.2
func (ds *Dsync) OpenBlockStream(ctx context.Context, info *dag.Info, meta map[string]string) (io.ReadCloser, error)
OpenBlockStream creates a block stream of the contents of the dag.Info
func (*Dsync) ProtocolVersion ¶ added in v0.2.2
ProtocolVersion reports the current procotol version for dsync
func (*Dsync) ReceiveBlock ¶ added in v0.2.0
func (ds *Dsync) ReceiveBlock(sid, hash string, data []byte) ReceiveResponse
ReceiveBlock adds one block to the local node that was sent by the remote node It notes in the Receive which nodes have been added When the DAG is complete, it puts the manifest into a DAG info and the DAG info into an infoStore
func (*Dsync) ReceiveBlocks ¶ added in v0.2.2
ReceiveBlocks ingests blocks being pushed into the local store
func (*Dsync) RemoveCID ¶ added in v0.2.0
RemoveCID unpins a CID if removes are enabled, does not immideately remove unpinned content
func (*Dsync) StartRemote ¶ added in v0.2.0
StartRemote makes dsync available for remote requests, starting an HTTP server if a listening address is specified. StartRemote returns immediately. Stop remote service by cancelling the passed-in context.
type HTTPClient ¶ added in v0.2.0
type HTTPClient struct { URL string NodeGetter format.NodeGetter BlockAPI coreiface.BlockAPI // contains filtered or unexported fields }
HTTPClient is the request side of doing dsync over HTTP
func (*HTTPClient) GetBlock ¶ added in v0.2.0
GetBlock fetches a block from a remote source over HTTP
func (*HTTPClient) GetDagInfo ¶ added in v0.2.0
func (rem *HTTPClient) GetDagInfo(ctx context.Context, id string, meta map[string]string) (info *dag.Info, err error)
GetDagInfo fetches a manifest from a remote source over HTTP
func (*HTTPClient) NewReceiveSession ¶ added in v0.2.0
func (rem *HTTPClient) NewReceiveSession(info *dag.Info, pinOnComplete bool, meta map[string]string) (sid string, diff *dag.Manifest, err error)
NewReceiveSession initiates a session for pushing blocks to a remote. It sends a Manifest to a remote source over HTTP
func (*HTTPClient) OpenBlockStream ¶ added in v0.2.2
func (rem *HTTPClient) OpenBlockStream(ctx context.Context, info *dag.Info, meta map[string]string) (io.ReadCloser, error)
OpenBlockStream sends a dag.Info to the remote & asks that it returns a stream of blocks in the info's manifest
func (*HTTPClient) ProtocolVersion ¶ added in v0.2.2
func (rem *HTTPClient) ProtocolVersion() (protocol.ID, error)
ProtocolVersion indicates the version of dsync the remote speaks, only available after a handshake is established
func (*HTTPClient) ReceiveBlock ¶ added in v0.2.0
func (rem *HTTPClient) ReceiveBlock(sid, hash string, data []byte) ReceiveResponse
ReceiveBlock asks a remote to receive a block over HTTP
func (*HTTPClient) ReceiveBlocks ¶ added in v0.2.2
ReceiveBlocks writes a block stream as an HTTP PUT request to the remote
type Hook ¶ added in v0.2.0
Hook is a function that a dsync instance will call at specified points in the sync lifecycle
type Pull ¶ added in v0.2.0
type Pull struct {
// contains filtered or unexported fields
}
Pull coordinates the transfer of missing blocks in a DAG from a remote to a block store
func NewPull ¶ added in v0.2.0
func NewPull(cidStr string, lng ipld.NodeGetter, bapi coreiface.BlockAPI, rem DagSyncable, meta map[string]string) (pull *Pull, err error)
NewPull sets up fetching a DAG at an id from a remote
func NewPullWithInfo ¶ added in v0.2.0
func NewPullWithInfo(info *dag.Info, lng ipld.NodeGetter, bapi coreiface.BlockAPI, rem DagSyncable, meta map[string]string) (pull *Pull, err error)
NewPullWithInfo creates a pull when we already have a dag.Info
func (*Pull) Updates ¶ added in v0.2.0
func (f *Pull) Updates() <-chan dag.Completion
Updates returns a read-only channel of pull completion changes
type Push ¶ added in v0.2.0
type Push struct {
// contains filtered or unexported fields
}
Push coordinates sending a manifest to a remote, tracking progress and state
func NewPush ¶ added in v0.2.0
func NewPush(lng ipld.NodeGetter, info *dag.Info, remote DagSyncable, pinOnComplete bool) (*Push, error)
NewPush initiates a send for a DAG at an id from a local to a remote. Push is initiated by the local node
func (*Push) SetMeta ¶ added in v0.2.0
SetMeta associates metadata with a push before its sent. These details may be leveraged by applications built on top of dsync. They're ignored by dsync. Meta must be set before starting the push
func (*Push) Updates ¶ added in v0.2.0
func (snd *Push) Updates() <-chan dag.Completion
Updates returns a read-only channel of Completion objects that depict transfer state
type ReceiveResponse ¶ added in v0.2.0
type ReceiveResponse struct { Hash string Status ReceiveResponseStatus Err error }
ReceiveResponse defines the result of sending a block, or attempting to send a block.
type ReceiveResponseStatus ¶ added in v0.2.0
type ReceiveResponseStatus int
ReceiveResponseStatus defines types of results for a request
const ( // StatusErrored indicates the request failed and cannot be retried StatusErrored ReceiveResponseStatus = -1 // StatusOk indicates the request completed successfully StatusOk ReceiveResponseStatus = 0 // StatusRetry indicates the request can be attempted again StatusRetry ReceiveResponseStatus = 1 )
func (ReceiveResponseStatus) String ¶ added in v0.2.0
func (s ReceiveResponseStatus) String() string
String returns a string representation of the status
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
DsyncPlugin is an ipfs deamon plugin for embedding dsync functionality directly into IPFS https://github.com/ipfs/go-ipfs-example-plugin
|
DsyncPlugin is an ipfs deamon plugin for embedding dsync functionality directly into IPFS https://github.com/ipfs/go-ipfs-example-plugin |