Documentation
¶
Overview ¶
Package dot implements data synchronization of user defined types using operational transformation/OT.
Please see https://github.com/dotchain/dot for a tutorial on how to use DOT.
The core functionality is spread out between dot/changes, dot/streams, dot/refs and dot/ops but this package exposes simple client and server implementations for common use cases:
Server example
import "encoding/gob" import "net/http" import "github.com/dotchain/dot" ... gob.Register(..) // register any user-standard OT types used http.Handle("/api/", dot.BoltServer("file.bolt")) http.ListenAndServe(":8080", nil)
Client example
import "encoding/gob" import "net/http" import "github.com/dotchain/dot" ... gob.Register(..) // register any user-standard OT types used session, stream := dot.Connect("http://localhost:8080/api/")
Immutable values ¶
DOT uses immutable values. Every Value must implement the change.Value interface which is a single Apply method that returns the result of applying a mutation (while leaving the original value effectively unchanged).
If the underlying type behaves like a collection (such as with Slices), the type must also implement some collection specific methods specified in the changes.Collection interface.
Most actual types are likely to be structs or slices with boilerplate implementaations of the interfaces. The x/dotc package has a code generator which can emit such boilerplate implementations simplifying this task.
Changes ¶
The changes package implements a set of simple changes (Replace, Splice and Move). Richer changes are expected to be built up by composition via changes.ChangeSet (which is a sequence of changes) and changes.PathChange (which modifies a value at a path).
Changes are immutable too and generally are meant to not maintain any reference to the value they apply on. While custom changes are possible (they have to implement the changes.Custom interface), they are expected to be rare as the default set of chnange types cover a vast variety of scenarios.
The core logic of DOT is in the Merge methods of changes: they guaranteee that if two independent changes are done to a value, the deviation in the values can be converged. The basic property of any two changes (on the same value) is that:
leftx, rightx := left.Merge(right) initial.Apply(nil, left).Apply(nil, leftx) == initial.Apply(nil, right).Apply(nil, rightx)
Care must be taken with custom changes to ensure that this property is preserved.
Streams ¶
Streams represent the sequence of changes associated with a single value. Stream instances behave like they are immutable: when a change happens, a new stream instance captures the change. Streams also support multiple-writers: it is possible for two independent changes to the same stream instance. In this case, the newly-created stream instances only capture the respective changes but these both have a "Next" value that converges to the same value. That is, the two separate streams implicitly have the changes from each other (but after transforming through the Merge) method.
This allows streams to perform quite nicely as convergent data structures without much syntax overhead:
initial := streams.S8{Stream: streams.New(), Value: "hello"} // two changes: append " world" and delete "lo" s1 := initial.Splice(5, 0, " world") s2 := initial.Splice(3, len("lo"), "") // streams automatically merge because they are both // based on initial s1 = s1.Latest() s2 = s2.Latest() fmt.Println(s1.Value, s1.Value == s2.Value) // Output: hel world true
Strongly typed streams ¶
The streams package provides a generic Stream implementation (via the New function) which implements the idea of a sequence of convergent changes. But much of the power of streams is in having strongly type streams where the stream is associated with a strongly typed value. The streams package provides simple text streamss (S8 and S16) as well as Bool and Counter types. Richer types like structs and slices can be converted to their stream equivalent rather mechanically and this is done by the x/dotc package -- using code generation.
Some day, Golang would support generics and then the code generation ugliness of x/dotc will no longer be needed.
Substreams ¶
Substreams are streams that refer into a particular field of a parent stream. For example, if the parent value is a struct with a "Done" field, it is possible to treat the "Done stream" as the changes scoped to this field. This allows code to be written much more cleanly. See the https://github.com/dotchain/dot#toggling-complete section of the documentation for an example.
Other features ¶
Streams support branching (a la Git) and folding. See the examples!
Streams also support references. A typical use case is maintaining the user cursor within a region of text. When remote changes happen to the text, the cursor needs to be updated. In fact, when one takes a substream of an element of an array, the array index needs to be automatically managed (i.e. insertions into the array before the index should automatically update the index etc). This is managed within streams using references.
Server implementations ¶
A particular value can be reconstituted from the sequence of changes to that value. In DOT, only these changes are stored and that too in an append-only log. This make the backend rather simple and generally agnostic of application types to a large extent.
See https://github.com/dotchain/dot#server for example code.
Example (ApplyingChanges) ¶
package main import ( "fmt" "github.com/dotchain/dot/changes" "github.com/dotchain/dot/changes/types" ) func main() { // import fmt // import github.com/dotchain/dot/changes // import github.com/dotchain/dot/changes/types // S8 is DOT-compatible string type with UTF8 string indices initial := types.S8("hello") append := changes.Splice{ Offset: len("hello"), // end of "hello" Before: types.S8(""), // nothing to remove After: types.S8(" world"), // insert " world" } // apply the change updated := initial.Apply(nil, append) fmt.Println(updated) }
Output: hello world
Example (ApplyingChangesUsingStreams) ¶
package main import ( "fmt" "github.com/dotchain/dot/streams" ) func main() { // import fmt // import github.com/dotchain/dot/streams initial := &streams.S8{Stream: streams.New(), Value: "hello"} updated := initial.Splice(5, 0, " world") fmt.Println(updated.Value) }
Output: hello world
Example (Branching) ¶
package main import ( "fmt" "github.com/dotchain/dot/streams" ) func main() { // import fmt // import github.com/dotchain/dot/streams // import github.com/dotchain/dot/changes // import github.com/dotchain/dot/changes/types // local is a branch of master master := &streams.S16{Stream: streams.New(), Value: "hello"} local := &streams.S16{Stream: streams.Branch(master.Stream), Value: master.Value} // edit locally: hello => hallo local.Splice(len("h"), len("e"), "a") // changes will not be reflected on master yet fmt.Println(master.Latest().Value) // push local changes up to master now local.Stream.Push() // now master = hallo fmt.Println(master.Latest().Value) }
Output: hello hallo
Example (ChangesetComposition) ¶
package main import ( "fmt" "github.com/dotchain/dot/changes" "github.com/dotchain/dot/changes/types" ) func main() { // import fmt // import github.com/dotchain/dot/changes // import github.com/dotchain/dot/changes/types initial := types.S8("hello") // append " world" => "hello world" append1 := changes.Splice{ Offset: len("hello"), Before: types.S8(""), After: types.S8(" world"), } // append "." => "hello world." append2 := changes.Splice{ Offset: len("hello world"), Before: types.S8(""), After: types.S8("."), } // now combine the two appends and apply both := changes.ChangeSet{append1, append2} updated := initial.Apply(nil, both) fmt.Println(updated) }
Output: hello world.
Example (ClientServerUsingBoltDB) ¶
package main import ( "fmt" "log" "net/http/httptest" "os" "github.com/dotchain/dot" "github.com/dotchain/dot/changes" "github.com/dotchain/dot/changes/types" ) func main() { defer remove("file.bolt")() logger := log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile) srv := dot.WithLogger(dot.BoltServer("file.bolt"), logger) defer dot.CloseServer(srv) httpSrv := httptest.NewServer(srv) defer httpSrv.Close() stream1, store1 := dot.NewSession().NonBlockingStream(httpSrv.URL, nil) stream2, store2 := dot.NewSession().Stream(httpSrv.URL, nil) defer store1.Close() defer store2.Close() stream1.Append(changes.Replace{Before: changes.Nil, After: types.S8("hello")}) fmt.Println("push", stream1.Push()) fmt.Println("pull", stream2.Pull()) } func remove(fname string) func() { if err := os.Remove(fname); err != nil { log.Println("Couldnt remove file", fname) } return func() { if err := os.Remove(fname); err != nil { log.Println("Couldnt remove file", fname) } } }
Output: push <nil> pull <nil>
Example (ClientServerUsingPostgresDB) ¶
package main import ( "database/sql" "fmt" "log" "net/http/httptest" "os" "time" "github.com/dotchain/dot" "github.com/dotchain/dot/changes" "github.com/dotchain/dot/changes/types" "github.com/dotchain/dot/ops/pg" ) func main() { sourceName := "user=postgres dbname=dot_test sslmode=disable" maxPoll := pg.MaxPoll defer func() { pg.MaxPoll = maxPoll db, err := sql.Open("postgres", sourceName) must(err) _, err = db.Exec("DROP TABLE operations") must(err) must(db.Close()) }() pg.MaxPoll = time.Second logger := log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile) srv := dot.WithLogger(dot.PostgresServer(sourceName), logger) defer dot.CloseServer(srv) httpSrv := httptest.NewServer(srv) defer httpSrv.Close() stream1, store1 := dot.NewSession().Stream(httpSrv.URL, logger) stream2, store2 := dot.NewSession().Stream(httpSrv.URL, logger) defer store1.Close() defer store2.Close() stream1.Append(changes.Replace{Before: changes.Nil, After: types.S8("hello")}) fmt.Println("push", stream1.Push()) fmt.Println("pull", stream2.Pull()) } func must(err error) { if err != nil { panic(err) } }
Output: push <nil> pull <nil>
Example (Convergence) ¶
package main import ( "fmt" "github.com/dotchain/dot/changes" "github.com/dotchain/dot/changes/types" ) func main() { // import fmt // import github.com/dotchain/dot/changes // import github.com/dotchain/dot/changes/types initial := types.S8("hello") // two changes: append " world" and delete "lo" insert := changes.Splice{Offset: 5, Before: types.S8(""), After: types.S8(" world")} remove := changes.Splice{Offset: 3, Before: types.S8("lo"), After: types.S8("")} // two versions derived from initial inserted := initial.Apply(nil, insert) removed := initial.Apply(nil, remove) // merge the changes removex, insertx := insert.Merge(remove) // converge by applying the above final1 := inserted.Apply(nil, removex) final2 := removed.Apply(nil, insertx) fmt.Println(final1, final1 == final2) }
Output: hel world true
Example (ConvergenceUsingStreams) ¶
package main import ( "fmt" "github.com/dotchain/dot/streams" ) func main() { // import fmt // import github.com/dotchain/dot/streams initial := streams.S8{Stream: streams.New(), Value: "hello"} // two changes: append " world" and delete "lo" s1 := initial.Splice(5, 0, " world") s2 := initial.Splice(3, len("lo"), "") // streams automatically merge because they are both // based on initial s1 = s1.Latest() s2 = s2.Latest() fmt.Println(s1.Value, s1.Value == s2.Value) }
Output: hel world true
Example (Folding) ¶
package main import ( "fmt" "github.com/dotchain/dot/changes" "github.com/dotchain/dot/changes/types" "github.com/dotchain/dot/streams" "github.com/dotchain/dot/x/fold" ) func main() { // import fmt // import github.com/dotchain/dot/streams // import github.com/dotchain/dot/changes // import github.com/dotchain/dot/changes/types // import github.com/dotchain/dot/x/fold // create master, folded child and the folding itself master := &streams.S16{Stream: streams.New(), Value: "hello world!"} foldChange := changes.Splice{ Offset: len("hello"), Before: types.S16(" world"), After: types.S16("..."), } foldedStream := fold.New(foldChange, master.Stream) folded := &streams.S16{Stream: foldedStream, Value: "hello...!"} // folded: hello...! => Hello...!!! folded = folded.Splice(0, len("h"), "H") folded = folded.Splice(len("Hello...!"), 0, "!!") fmt.Println(folded.Value) // master: hello world => hullo world master = master.Splice(len("h"), len("e"), "u") fmt.Println(master.Value) // now folded = Hullo...!!! fmt.Println(folded.Latest().Value) // master = Hullo world!!! fmt.Println(master.Latest().Value) }
Output: Hello...!!! hullo world! Hullo...!!! Hullo world!!!
Example (PathComposition) ¶
package main import ( "fmt" "github.com/dotchain/dot/changes" "github.com/dotchain/dot/changes/types" ) func main() { // import fmt // import github.com/dotchain/dot/changes // import github.com/dotchain/dot/changes/types // types.A is a generic array type and types.M is a map type initial := types.A{types.M{"hello": types.S8("world")}} // replace "world" with "world!" replace := changes.Replace{Before: types.S8("world"), After: types.S8("world!")} // replace "world" with "world!" of initial[0]["hello"] path := []interface{}{0, "hello"} c := changes.PathChange{Path: path, Change: replace} updated := initial.Apply(nil, c) fmt.Println(updated) }
Output: [map[hello:world!]]
Example (UndoStreams) ¶
package main import ( "fmt" "github.com/dotchain/dot/streams" "github.com/dotchain/dot/streams/undo" ) func main() { // import fmt // import github.com/dotchain/dot/streams // import github.com/dotchain/dot/changes // import github.com/dotchain/dot/changes/types // import github.com/dotchain/dot/streams/undo // create master, undoable child and the undo stack itself master := &streams.S16{Stream: streams.New(), Value: "hello"} s := undo.New(master.Stream) undoableChild := &streams.S16{Stream: s, Value: master.Value} // change hello => Hello undoableChild = undoableChild.Splice(0, len("h"), "H") fmt.Println(undoableChild.Value) // for kicks, update master hello => hello$ as if it came // from the server master.Splice(len("hello"), 0, "$") // now undo this via the stack s.Undo() // now undoableChild should be hello$ undoableChild = undoableChild.Latest() fmt.Println(undoableChild.Value) // now redo the last operation to get Hello$ s.Redo() undoableChild = undoableChild.Latest() fmt.Println(undoableChild.Value) }
Output: Hello hello$ Hello$
Index ¶
- func BoltServer(fileName string) http.Handler
- func CloseServer(h http.Handler)
- func PostgresServer(sourceName string) http.Handler
- func WithLogger(h http.Handler, l log.Log) http.Handler
- type Session
- func (s *Session) Load(ver int) (ops.Op, []ops.Op)
- func (s *Session) NonBlockingStream(url string, logger dotlog.Log) (streams.Stream, ops.Store)
- func (s *Session) Store(ver int, op ops.Op, merge []ops.Op)
- func (s *Session) Stream(url string, logger dotlog.Log) (streams.Stream, ops.Store)
- func (s *Session) UpdateVersion(version int, pending, merge []ops.Op)
Examples ¶
- Package (ApplyingChanges)
- Package (ApplyingChangesUsingStreams)
- Package (Branching)
- Package (ChangesetComposition)
- Package (ClientServerUsingBoltDB)
- Package (ClientServerUsingPostgresDB)
- Package (Convergence)
- Package (ConvergenceUsingStreams)
- Package (Folding)
- Package (PathComposition)
- Package (UndoStreams)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BoltServer ¶
BoltServer returns a http.Handler serving DOT requests backed by the db
func CloseServer ¶
CloseServer closes the http.Handler returned by this package
func PostgresServer ¶
PostgresServer returns a http.Handler serving DOT requests backed by the db
Types ¶
type Session ¶
type Session struct { Version int Pending, Merge []ops.Op OpCache map[int]ops.Op MergeCache map[int][]ops.Op }
Session represents a client session
func (*Session) NonBlockingStream ¶
NonBlockingStream returns the stream of changes for this session
The returned store can be used to *close* the stream when needed
Actual syncing of messages happens when Push and Pull are called on the stream. Pull() does the server-fetch asynchronously, returning immediately if there is no server data available.
Directories
¶
Path | Synopsis |
---|---|
Package changes implements the core mutation types for OT.
|
Package changes implements the core mutation types for OT. |
crdt
Package crdt implements CRDT types and associated changes The main CRDT types are Dict and Seq which implement map-like and list-like container types.
|
Package crdt implements CRDT types and associated changes The main CRDT types are Dict and Seq which implement map-like and list-like container types. |
diff
Package diff compares two values and returns the changes
|
Package diff compares two values and returns the changes |
run
Package run implements a custom change that applies to a sequence of array elements.
|
Package run implements a custom change that applies to a sequence of array elements. |
table
Package table implements a loose 2d collection of values
|
Package table implements a loose 2d collection of values |
types
Package types implements OT-compatible immutable values.
|
Package types implements OT-compatible immutable values. |
Generated.
|
Generated. |
Package log defines the interface for loging within the DOT project.
|
Package log defines the interface for loging within the DOT project. |
Package ops implements network and storage for DOT This builds on top of the https://godoc.org/github.com/dotchain/dot/changes package
|
Package ops implements network and storage for DOT This builds on top of the https://godoc.org/github.com/dotchain/dot/changes package |
bolt
Package bolt implements the dot storage for files using boltdb A http server can be implemented like so: import "github.com/dotchain/dot/ops/bolt" import "github.com/dotchain/dot/ops/nw" store, _ := bolt.New("file.bolt", "instance", nil) defer store.Close() handler := &nw.Handler{Store: store} h := func(w http.ResponseWriter, req *http.Request) { // Enable CORS w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "Content-Type") if req.Method == "OPTIONS" { return } handler.ServeHTTP(w, req) } http.HandleFunc("/api/", h) http.ListenAndServe() Concurrency A single store instance is safe for concurrent access but the provided file is locked until the store is closed.
|
Package bolt implements the dot storage for files using boltdb A http server can be implemented like so: import "github.com/dotchain/dot/ops/bolt" import "github.com/dotchain/dot/ops/nw" store, _ := bolt.New("file.bolt", "instance", nil) defer store.Close() handler := &nw.Handler{Store: store} h := func(w http.ResponseWriter, req *http.Request) { // Enable CORS w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "Content-Type") if req.Method == "OPTIONS" { return } handler.ServeHTTP(w, req) } http.HandleFunc("/api/", h) http.ListenAndServe() Concurrency A single store instance is safe for concurrent access but the provided file is locked until the store is closed. |
pg
Package pg implements the dot storage for postgres 9.5+ A http server can be implemented like so: import "github.com/dotchain/dot/ops/pg" import "github.com/dotchain/dot/ops/nw" dataSource := "dbname=mydb user=xyz" store, _ := sql.New(dataSource, "instance", nil) defer store.Close() handler := &nw.Handler{Store: store} h := func(w http.ResponseWriter, req *http.Request) { // Enable CORS w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "Content-Type") if req.Method == "OPTIONS" { return } handler.ServeHTTP(w, req) } http.HandleFunc("/api/", h) http.ListenAndServe()
|
Package pg implements the dot storage for postgres 9.5+ A http server can be implemented like so: import "github.com/dotchain/dot/ops/pg" import "github.com/dotchain/dot/ops/nw" dataSource := "dbname=mydb user=xyz" store, _ := sql.New(dataSource, "instance", nil) defer store.Close() handler := &nw.Handler{Store: store} h := func(w http.ResponseWriter, req *http.Request) { // Enable CORS w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "Content-Type") if req.Method == "OPTIONS" { return } handler.ServeHTTP(w, req) } http.HandleFunc("/api/", h) http.ListenAndServe() |
sjson
Package sjson implements a portable strongly-typed json-like codec.
|
Package sjson implements a portable strongly-typed json-like codec. |
Package refs implements reference paths, carets and selections.
|
Package refs implements reference paths, carets and selections. |
Package streams defines convergent streams of changes A stream is like an event emitter or source: it tracks a sequence of changes on a value.
|
Package streams defines convergent streams of changes A stream is like an event emitter or source: it tracks a sequence of changes on a value. |
test
|
|
seqtest
Package seqtest implements a standard suite of validations.
|
Package seqtest implements a standard suite of validations. |
x
|
|
cmd/dotls
Command dotls lists the operations The argument can be a file name or a url
|
Command dotls lists the operations The argument can be a file name or a url |
dotc
Package dotc implements code-generation tools for dot.changes
|
Package dotc implements code-generation tools for dot.changes |
fold
Package fold implements a simple scheme for folding.
|
Package fold implements a simple scheme for folding. |
heap
Package heap implements a heap value type
|
Package heap implements a heap value type |
rich
Package rich implements rich text data types Package rich implements rich text data types
|
Package rich implements rich text data types Package rich implements rich text data types |
rich/data
Package data impleements data structures for use with rich text
|
Package data impleements data structures for use with rich text |
rich/eval
Package eval implements evaluated objects Package eval implements evaluated objects Package eval implements expression values that can be evaluated Expression syntax The language used by eval is a very simple infix expression.
|
Package eval implements evaluated objects Package eval implements evaluated objects Package eval implements expression values that can be evaluated Expression syntax The language used by eval is a very simple infix expression. |
rich/html
Package html implements rich text to HTML conversion
|
Package html implements rich text to HTML conversion |
snapshot
Package snapshot manages session storage Snapshots are session meta data (version, pending), the actual app value and the transformed/merged ops cache.
|
Package snapshot manages session storage Snapshots are session meta data (version, pending), the actual app value and the transformed/merged ops cache. |