Documentation ¶
Overview ¶
Package streams defines convergent streams of changes
A stream is like an event emitter or source: it tracks a sequence of changes on a value. It is an immutable value that logically maps to a "Git commit". Appending a change to an event is equivalent to creating a new commit based on the previous stream.
Streams differ from event emitters or immutable values in a fundamental way: they are convergent. All streams from the same family converge to the same value
For example, consider two changes on the same initial stream value.
s := ...stream... s1 := s.Append(change1) s2 := s.Append(change2)
The two output streams converge in the following sense:
s1Next, c1Next := s1.Next() s2Next, c2Next := s2.Next() initial.Apply(nil, c1).Apply(nil, c1Next) == initial.Apply(nil, c2).Apply(nil, c2Next)
Basically, just chasing the sequence of changes from a particular stream instance is guaranteed to end with the same value as any other stream in that family.
A "family" is any stream derived from another in by means of any number of "Append" calls.
Branching ¶
Streams support Git-like branching with local changes not automatically appearing on the parent until a call to Push.
Substream ¶
It is possible to create sub-streams for elements rooted below the current element. For example, one can take a stream of elements and only focus on the substream of changes to the 5th element. In this case, if the parent stream has a change which splices in a few elements before 5, the sub-stream should correspondingly refer to the new indices. And any changes on the sub-stream should refer to the correct index on the parent. The Substream() method provides the implementation of this concept.
Value Streams ¶
Streams inherently only track the actual changes and not the underlying values but most applications also need to track the current value. See Int, Bool, S16 or S8 for an example stream that tracks an underlying value backed by a Stream.
Custom stream implementations ¶
The dotc package (https://godoc.org/github.com/dotchain/dot/x/dotc) defines a mechanism to automatically generate the Stream related types for structs, slices and unions.
Example (NewStream) ¶
package main import ( "fmt" "github.com/dotchain/dot/changes" "github.com/dotchain/dot/changes/types" "github.com/dotchain/dot/streams" ) func main() { s := streams.New() s.Append(changes.Splice{ Offset: 0, Before: types.S8(""), After: types.S8("OK "), }) _, c := streams.Latest(s) fmt.Println("Changed:", types.S8("Hello World").Apply(nil, c)) }
Output: Changed: OK Hello World
Example (StreamBranching) ¶
package main import ( "fmt" "github.com/dotchain/dot/changes" "github.com/dotchain/dot/changes/types" "github.com/dotchain/dot/streams" ) func main() { val := changes.Value(types.S8("Hello World")) s := streams.New() child := streams.Branch(s) // update child, the changes won't be reflected on latest child.Append(changes.Splice{ Offset: 0, Before: types.S8(""), After: types.S8("OK "), }) _, c := streams.Latest(s) fmt.Println("Latest:", val.Apply(nil, c)) // merge child and parent, change will get reflected if err := child.Push(); err != nil { fmt.Println("Error", err) } _, c = streams.Latest(s) fmt.Println("Latest:", val.Apply(nil, c)) }
Output: Latest: Hello World Latest: OK Hello World
Example (StreamMerge) ¶
package main import ( "fmt" "github.com/dotchain/dot/changes" "github.com/dotchain/dot/changes/types" "github.com/dotchain/dot/streams" ) func main() { s := streams.New() s1 := s.Append(changes.Splice{ Offset: 0, Before: types.S8(""), After: types.S8("OK "), }) _, c := streams.Latest(s) fmt.Println("Changed:", types.S8("Hello World").Apply(nil, c)) // note that this works on s, so the offset location is based // off "Hello World", rather than "OK Hello World" _ = s.Append(changes.Splice{ Offset: len("Hello World"), Before: types.S8(""), After: types.S8("!"), }) _, c = streams.Latest(s) fmt.Println("Changed:", types.S8("Hello World").Apply(nil, c)) // now modify s1 again which is based off of "OK Hello World" s1.Append(changes.Splice{ Offset: len("OK Hello World"), Before: types.S8(""), After: types.S8("*"), }) _, c = streams.Latest(s) fmt.Println("Changed:", types.S8("Hello World").Apply(nil, c)) }
Output: Changed: OK Hello World Changed: OK Hello World! Changed: OK Hello World!*
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bool ¶
Bool implements a bool stream.
type Counter ¶
Counter implements a counter stream.
type Int ¶
Int implements an int stream.
type S16 ¶
S16 implements an UFT16 string stream
func (*S16) Move ¶
Move moves[offset:offset+count] by the provided distance to the right (or if distance is negative, to the left)
type S8 ¶
S8 implements an UFT8 string stream
func (*S8) Move ¶
Move moves[offset:offset+count] by the provided distance to the right (or if distance is negative, to the left)
type Stream ¶
type Stream interface { // Append adds a change on top of the current change. If // the current change has a Next, this is merged with the next // and applied to the Next instead. That way, the change is // propagated all the way and applied at the end of the // stream. // // A listener on the stream can expect to get a change that is // safe to apply on top of the last change emitted. Append(changes.Change) Stream // ReverseAppend is just like Append except ReverseMerge is // used instead of Merge. ReverseAppend is used to when a // remote change is being appended -- with the newly appended // change actually taking precedence over all other changes // that have been applied on top of the current instance. ReverseAppend(changes.Change) Stream // Next returns the change and the next stream. If no further // changes exist, it returns nil for both. All related stream // instances are guaranateed to converge -- i.e. irrespective // of which instance one holds, iterating over all the Next // values and applying them will get them all to converge to // the same value. Next() (Stream, changes.Change) // Push pushes all local change up to any remote stream. // Does nothing if not connected to a remote stream Push() error // Pull pulls all changes from a remote stream. // Does nothing if not connected to a remote stream Pull() error // Undo undoes the last change on this branch // Does nothing if not connected to a undo stack Undo() // Redo redoes the last change on this branch. // Does nothing if not connected to a undo stack Redo() }
Stream is an immutable type to track a sequence of changes.
A change can be "applied" to a stream instance via the Append method. This results in a new stream instance. The old and the new stream instances can both be used for further changes but they represent different states: a change applied on an earlier version of the stream will be transformed onto the latest when it is actually applied.
Logically, every stream is a change made on top of another and so forms a tree. But each stream instance is careful to not store any references to previous changes as this would cause the memory to constantly grow. Instead, each stream instance maintains a forward list -- a list of changes that will effectively get it to the same converged state as any other related stream instance.
This list can be traversed via the Next() method.
Branching ¶
All changes made on a stream are propagated to the source. It is possible to create git-like branches using the Branch type, where the changes are cached until an explicit call to Pull or Push to move the changes between two branches.
func Branch ¶
Branch returns a new stream based on the provided stream. All changes made on the branch are only merged upstream when Push is called explicitly and all changes made upstream are only brought into the local branch when Pull is called explicitly.
func Latest ¶
Latest returns the latest stream instance and the set of changes that have taken place until then
func Substream ¶
Substream creates a child stream
The sequence of keys are used as paths into the logical value and work for both array indices and object-like keys.
For instance: `streams.Substream(parent, 5, "count")` refers to the "count" field of the 5th element and any changes to it.
Note that the path provided will be kept up-to-date. In the previous example, if 10 items were inserted into the root at index 0, the path would be internally updated to [15, "count"] at that point. This guarantees that any updates to the substream get reflected at the right index of the parent stream.
Substreams may "terminate" if a parent or some higher node is simply deleted. Note that deleting the element referred to by the path itself does not cause the stream to dry up -- some element higher up needs to be replaced. Dried up streams do not hold references to anything and so will not cause garbage collection issues. The only operation that such streams still permit would be the deletion of callbacks.