Documentation
¶
Overview ¶
Package crdt provides a replicated go-datastore (key-value store) implementation using Merkle-CRDTs built with IPLD nodes.
This Datastore is agnostic to how new MerkleDAG roots are broadcasted to the rest of replicas (`Broadcaster` component) and to how the IPLD nodes are made discoverable and retrievable to by other replicas (`DAGSyncer` component).
The implementation is based on the "Merkle-CRDTs: Merkle-DAGs meet CRDTs" paper by Héctor Sanjuán, Samuli Pöyhtäri and Pedro Teixeira.
Note that, in the absence of compaction (which must be performed manually), a crdt.Datastore will only grow in size even when keys are deleted.
The time to be fully synced for new Datastore replicas will depend on how fast they can retrieve the DAGs announced by the other replicas, but newer values will be available before older ones.
Index ¶
- Variables
- type Broadcaster
- type Datastore
- func (store *Datastore) Batch(ctx context.Context) (ds.Batch, error)
- func (store *Datastore) Close() error
- func (store *Datastore) Delete(ctx context.Context, key ds.Key) error
- func (store *Datastore) DotDAG(ctx context.Context, w io.Writer) error
- func (store *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error)
- func (store *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err error)
- func (store *Datastore) Has(ctx context.Context, key ds.Key) (exists bool, err error)
- func (store *Datastore) InternalStats(ctx context.Context) Stats
- func (store *Datastore) IsDirty(ctx context.Context) bool
- func (store *Datastore) MarkClean(ctx context.Context)
- func (store *Datastore) MarkDirty(ctx context.Context)
- func (store *Datastore) PrintDAG(ctx context.Context) error
- func (store *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error
- func (store *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error)
- func (store *Datastore) Repair(ctx context.Context) error
- func (store *Datastore) Sync(ctx context.Context, prefix ds.Key) error
- type Options
- type PubSubBroadcaster
- type SessionDAGService
- type Stats
Constants ¶
This section is empty.
Variables ¶
var (
ErrNoMoreBroadcast = errors.New("receiving blocks aborted since no new blocks will be broadcasted")
)
Common errors.
Functions ¶
This section is empty.
Types ¶
type Broadcaster ¶
type Broadcaster interface { // Send payload to other replicas. Broadcast(context.Context, []byte) error // Obtain the next payload received from the network. Next(context.Context) ([]byte, error) }
A Broadcaster provides a way to send (notify) an opaque payload to all replicas and to retrieve payloads broadcasted.
type Datastore ¶
type Datastore struct {
// contains filtered or unexported fields
}
Datastore makes a go-datastore a distributed Key-Value store using Merkle-CRDTs and IPLD.
func New ¶
func New( store ds.Datastore, namespace ds.Key, dagSyncer ipld.DAGService, bcast Broadcaster, opts *Options, ) (*Datastore, error)
New returns a Merkle-CRDT-based Datastore using the given one to persist all the necessary data under the given namespace. It needs a DAG-Service component for IPLD nodes and a Broadcaster component to distribute and receive information to and from the rest of replicas. Actual implementation of these must be provided by the user, but it normally means using ipfs-lite (https://github.com/hsanjuan/ipfs-lite) as a DAG Service and the included libp2p PubSubBroadcaster as a Broadcaster.
The given Datastore is used to back all CRDT-datastore contents and accounting information. When using an asynchronous datastore, the user is in charge of calling Sync() regularly. Sync() will persist paths related to the given prefix, but note that if other replicas are modifying the datastore, the prefixes that will need syncing are not only those modified by the local replica. Therefore the user should consider calling Sync("/"), with an empty prefix, in that case, or use a synchronous underlying datastore that persists things directly on write.
The CRDT-Datastore should call Close() before the given store is closed.
func (*Datastore) Batch ¶
Batch implements batching for writes by accumulating Put and Delete in the same CRDT-delta and only applying it and broadcasting it on Commit().
func (*Datastore) DotDAG ¶ added in v0.2.4
DotDAG writes a dot-format representation of the CRDT DAG to the given writer. It can be converted to image format and visualized with graphviz tooling.
func (*Datastore) Get ¶
Get retrieves the object `value` named by `key`. Get will return ErrNotFound if the key is not mapped to a value.
func (*Datastore) GetSize ¶
GetSize returns the size of the `value` named by `key`. In some contexts, it may be much cheaper to only get the size of the value rather than retrieving the value itself.
func (*Datastore) Has ¶
Has returns whether the `key` is mapped to a `value`. In some contexts, it may be much cheaper only to check for existence of a value, rather than retrieving the value itself. (e.g. HTTP HEAD). The default implementation is found in `GetBackedHas`.
func (*Datastore) InternalStats ¶ added in v0.3.9
InternalStats returns internal datastore information like the current heads and max height.
func (*Datastore) PrintDAG ¶
PrintDAG pretty prints the current Merkle-DAG to stdout in a pretty fashion. Only use for small DAGs. DotDAG is an alternative for larger DAGs.
func (*Datastore) Query ¶
Query searches the datastore and returns a query result. This function may return before the query actually runs. To wait for the query:
result, _ := ds.Query(q) // use the channel interface; result may come in at different times for entry := range result.Next() { ... } // or wait for the query to be completely done entries, _ := result.Rest() for entry := range entries { ... }
func (*Datastore) Repair ¶ added in v0.3.0
Repair triggers a DAG-repair, which tries to re-walk the CRDT-DAG from the current heads until the roots, processing currently unprocessed branches.
Calling Repair will walk the full DAG even if the dirty bit is unset, but will mark the store as clean unpon successful completion.
type Options ¶
type Options struct { Logger logging.StandardLogger RebroadcastInterval time.Duration // The PutHook function is triggered whenever an element // is successfully added to the datastore (either by a local // or remote update), and only when that addition is considered the // prevalent value. PutHook func(k ds.Key, v []byte) // The DeleteHook function is triggered whenever a version of an // element is successfully removed from the datastore (either by a // local or remote update). Unordered and concurrent updates may // result in the DeleteHook being triggered even though the element is // still present in the datastore because it was re-added or not fully // tombstoned. If that is relevant, use Has() to check if the removed // element is still part of the datastore. DeleteHook func(k ds.Key) // NumWorkers specifies the number of workers ready to walk DAGs NumWorkers int // DAGSyncerTimeout specifies how long to wait for a DAGSyncer. // Set to 0 to disable. DAGSyncerTimeout time.Duration // MaxBatchDeltaSize will automatically commit any batches whose // delta size gets too big. This helps keep DAG nodes small // enough that they will be transferred by the network. MaxBatchDeltaSize int // RepairInterval specifies how often to walk the full DAG until // the root(s) if it has been marked dirty. 0 to disable. RepairInterval time.Duration // MultiHeadProcessing lets several new heads to be processed in // parallel. This results in more branching in general. More // branching is not necessarily a bad thing and may improve // throughput, but everything depends on usage. MultiHeadProcessing bool }
Options holds configurable values for Datastore.
func DefaultOptions ¶
func DefaultOptions() *Options
DefaultOptions initializes an Options object with sensible defaults.
type PubSubBroadcaster ¶ added in v0.0.3
type PubSubBroadcaster struct {
// contains filtered or unexported fields
}
PubSubBroadcaster implements a Broadcaster using libp2p PubSub.
func NewPubSubBroadcaster ¶ added in v0.0.3
func NewPubSubBroadcaster(ctx context.Context, psub *pubsub.PubSub, topic string) (*PubSubBroadcaster, error)
NewPubSubBroadcaster returns a new broadcaster using the given PubSub and a topic to subscribe/broadcast to. The given context can be used to cancel the broadcaster. Please register any topic validators before creating the Broadcaster.
The broadcaster can be shut down by cancelling the given context. This must be done before Closing the crdt.Datastore, otherwise things may hang.
type SessionDAGService ¶ added in v0.3.0
type SessionDAGService interface { ipld.DAGService Session(context.Context) ipld.NodeGetter }
A SessionDAGService is a Sessions-enabled DAGService. This type of DAG-Service provides an optimized NodeGetter to make multiple related requests. The same session-enabled NodeGetter is used to download DAG branches when the DAGSyncer supports it.