Documentation ¶
Overview ¶
Package crdt implements the Cloudstate CRDT state model support.
Index ¶
- Variables
- type CRDT
- type CancelFunc
- type ChangeFunc
- type Clock
- type CommandContext
- func (c *CommandContext) CancelFunc(f CancelFunc)
- func (c *CommandContext) ChangeFunc(f ChangeFunc)
- func (c *CommandContext) Command() *protocol.Command
- func (c *CommandContext) EndStream()
- func (c *CommandContext) Forward(forward *protocol.Forward)
- func (c *CommandContext) SideEffect(effect *protocol.SideEffect)
- func (c *CommandContext) Streamed() bool
- type CommandID
- type Context
- type Entity
- type EntityHandler
- type EntityID
- type Flag
- type GCounter
- type GSet
- type LWWRegister
- type ORMap
- func (m *ORMap) Clear()
- func (m *ORMap) Delete(key *any.Any)
- func (m *ORMap) Delta() *entity.CrdtDelta
- func (m *ORMap) Flag(key *any.Any) (*Flag, error)
- func (m *ORMap) GCounter(key *any.Any) (*GCounter, error)
- func (m *ORMap) GSet(key *any.Any) (*GSet, error)
- func (m *ORMap) Get(key *any.Any) CRDT
- func (m *ORMap) HasDelta() bool
- func (m *ORMap) HasKey(x *any.Any) (hasKey bool)
- func (m *ORMap) Keys() []*any.Any
- func (m *ORMap) LWWRegister(key *any.Any) (*LWWRegister, error)
- func (m *ORMap) ORMap(key *any.Any) (*ORMap, error)
- func (m *ORMap) ORSet(key *any.Any) (*ORSet, error)
- func (m *ORMap) PNCounter(key *any.Any) (*PNCounter, error)
- func (m *ORMap) Set(key *any.Any, value CRDT)
- func (m *ORMap) Size() int
- func (m *ORMap) State() *entity.CrdtState
- func (m *ORMap) Values() []*entity.CrdtState
- func (m *ORMap) Vote(key *any.Any) (*Vote, error)
- type ORSet
- func (s *ORSet) Add(a *any.Any)
- func (s ORSet) Added() []*any.Any
- func (s *ORSet) Clear()
- func (s *ORSet) Delta() *entity.CrdtDelta
- func (s *ORSet) HasDelta() bool
- func (s *ORSet) Remove(a *any.Any)
- func (s ORSet) Removed() []*any.Any
- func (s *ORSet) Size() int
- func (s *ORSet) State() *entity.CrdtState
- func (s ORSet) Value() []*any.Any
- type PNCounter
- type Server
- type ServiceName
- type Vote
- func (v *Vote) All() bool
- func (v *Vote) AtLeastOne() bool
- func (v *Vote) Delta() *entity.CrdtDelta
- func (v *Vote) HasDelta() bool
- func (v *Vote) Majority() bool
- func (v *Vote) SelfVote() bool
- func (v *Vote) State() *entity.CrdtState
- func (v *Vote) Vote(vote bool)
- func (v *Vote) Voters() uint32
- func (v *Vote) VotesFor() uint32
Constants ¶
This section is empty.
Variables ¶
var ErrCtxFailCalled = errors.New("context failed")
var ErrStateChanged = errors.New("CRDT change not allowed")
Functions ¶
This section is empty.
Types ¶
type CancelFunc ¶
type CancelFunc func(c *CommandContext) error
type ChangeFunc ¶
type ChangeFunc func(c *CommandContext) (*any.Any, error)
type Clock ¶
type Clock uint64
const ( // The Default clock, uses the current system time as the clock value. Default Clock = iota // A Reverse clock, based on the system clock. Using this effectively // achieves First-Write-Wins semantics. This is susceptible to the // same clock skew problems as the default clock. Reverse // A custom clock. // The custom clock value is passed by using the customClockValue parameter on // the `SetWithClock` method. The value should be a domain specific monotonically // increasing value. For example, if the source of the value for this register // is a single device, that device may attach a sequence number to each update, // that sequence number can be used to guarantee that the register will converge // to the last update emitted by that device. Custom // CustomAutoIncrement is a custom clock, that automatically increments the // custom value if the local clock value is greater than it. // // This is like `Custom`, however if when performing the update in the proxy, // it's found that the clock value of the register is greater than the specified // clock value for the update, the proxy will instead use the current clock // value of the register plus one. // // This can guarantee that updates done on the same node will be causally // ordered (addressing problems caused by the system clock being adjusted), // but will not guarantee causal ordering for updates on different nodes, // since it's possible that an update on a different node has not yet been // replicated to this node. CustomAutoIncrement )
type CommandContext ¶
type CommandContext struct { *Context CommandID CommandID // contains filtered or unexported fields }
A CommandContext carries change and cancel function handlers and other values to handle a command over different phases of a commands lifecycle.
func (*CommandContext) CancelFunc ¶
func (c *CommandContext) CancelFunc(f CancelFunc)
CancelFunc registers an on cancel handler for this command. The registered function will be invoked if the client initiates a stream cancel. It will not be invoked if the entity cancels the stream itself. The CancelFunc may update the CRDT, and may emit side effects.
func (*CommandContext) ChangeFunc ¶
func (c *CommandContext) ChangeFunc(f ChangeFunc)
ChangeFunc sets the function to be called whenever the CRDT is changed. For non-streamed contexts this is a `no operation`.
func (*CommandContext) Command ¶
func (c *CommandContext) Command() *protocol.Command
Command returns the protobuf message the context is handling as a command.
func (*CommandContext) EndStream ¶
func (c *CommandContext) EndStream()
EndStream marks a command stream to be ended.
func (*CommandContext) Forward ¶
func (c *CommandContext) Forward(forward *protocol.Forward)
Forward forwards this command to another service. The protocol.Forward provided has to ensure it references a valid service and command.
func (*CommandContext) SideEffect ¶
func (c *CommandContext) SideEffect(effect *protocol.SideEffect)
SideEffect adds a side effect to being emitted after the current command successfully has completed.
func (*CommandContext) Streamed ¶
func (c *CommandContext) Streamed() bool
Streamed returns whether the command handled by the context is streamed.
type Context ¶
type Context struct { // EntityID is the ID of the entity. EntityID EntityID // Entity describes the instance that is used as an entity. Entity *Entity // Instance is the instance of the entity this context is for. Instance EntityHandler // contains filtered or unexported fields }
Context holds the context of a running entity.
type Entity ¶
type Entity struct { // ServiceName is the fully qualified name of the service that implements // this entities interface. Setting it is mandatory. ServiceName ServiceName // EntityFunc creates a new entity. EntityFunc func(id EntityID) EntityHandler }
Entity captures an Entity with its ServiceName. It is used to be registered as an CRDT entity on a Cloudstate instance.
type EntityHandler ¶
type EntityHandler interface { HandleCommand(ctx *CommandContext, name string, msg proto.Message) (*any.Any, error) Default(ctx *Context) (CRDT, error) Set(ctx *Context, state CRDT) error }
EntityHandler has to be implemented by any type that wants to get registered as a crdt.Entity tag::entity-handler[]
type Flag ¶
type Flag struct {
// contains filtered or unexported fields
}
A Flag is a boolean value that starts as false, and can be set to true. Once set to true, it cannot be set back to false. A flag is a very simple CRDT, the merge function is simply a boolean or over the two flag values being merged.
type GCounter ¶
type GCounter struct {
// contains filtered or unexported fields
}
GCounter, or Grow-only Counter, is a counter that can only be incremented. It works by tracking a separate counter value for each node, and taking the sum of the values for all the nodes to get the current counter value. Since each node only updates its own counter value, each node can coordinate those updates to ensure they are consistent. Then the merge function, if it sees two different values for the same node, simply takes the highest value, because that has to be the most recent value that the node published.
func NewGCounter ¶
func NewGCounter() *GCounter
type GSet ¶
type GSet struct {
// contains filtered or unexported fields
}
GSet, or Grow-only Set, is a set that can only have items added to it. A GSet is a very simple CRDT, its merge function is defined by taking the union of the two GSets being merged.
type LWWRegister ¶
type LWWRegister struct {
// contains filtered or unexported fields
}
LWWRegister, or Last-Write-Wins Register, is a CRDT that can hold any value, along with a clock value and node id to indicate when it was updated by which node. If two nodes have two different versions of the value, the one with the highest clock value wins. If the clock values are equal, then a stable function on the nodes is used to determine it (eg, the node with the lowest address). Note that LWWRegisters do not support partial updates of their values. If the register holds a person object, and one node updates the age property, while another concurrently updates the name property, only one of those updates will eventually win. By default, LWWRegister’s are vulnerable to clock skew between nodes. Cloudstate supports optionally providing a custom clock value should a more trustworthy ordering for updates be available.
func NewLWWRegister ¶
func NewLWWRegister(x *any.Any) *LWWRegister
func NewLWWRegisterWithClock ¶
func NewLWWRegisterWithClock(x *any.Any, c Clock, customClockValue int64) *LWWRegister
NewLWWRegisterWithClock uses the custom clock value if the clock selected is a custom clock. This is ignored if the clock is not a custom clock.
func (*LWWRegister) Delta ¶
func (r *LWWRegister) Delta() *entity.CrdtDelta
func (*LWWRegister) HasDelta ¶
func (r *LWWRegister) HasDelta() bool
func (*LWWRegister) Set ¶
func (r *LWWRegister) Set(x *any.Any)
func (*LWWRegister) SetWithClock ¶
func (r *LWWRegister) SetWithClock(x *any.Any, c Clock, customClockValue int64)
SetWithClock uses the custom clock value to use if the clock selected is a custom clock. This is ignored if the clock is not a custom clock.
func (*LWWRegister) State ¶
func (r *LWWRegister) State() *entity.CrdtState
func (*LWWRegister) Value ¶
func (r *LWWRegister) Value() *any.Any
type ORMap ¶
type ORMap struct {
// contains filtered or unexported fields
}
ORMap, or Observed-Removed Map, is similar to an ORSet, with the addition that the values of the set serve as keys for a map, and the values of the map are themselves, CRDTs. When a value for the same key in an ORMap is modified concurrently on two different nodes, the values from the two nodes are merged together.
func (*ORMap) LWWRegister ¶
func (m *ORMap) LWWRegister(key *any.Any) (*LWWRegister, error)
type ORSet ¶
type ORSet struct {
// contains filtered or unexported fields
}
ORSet, or Observed-Removed Set, is a set that can have items both added and removed from it. It is implemented by maintaining a set of unique tags for each element which are generated on addition into the set. When an element is removed, all the tags that that node currently observes are added to the removal set, so as long as there haven’t been any new additions that the node hasn’t seen when it removed the element, the element will be removed.
type PNCounter ¶
type PNCounter struct {
// contains filtered or unexported fields
}
PNCounter, or Positive-Negative Counter, is a counter that can both be incremented and decremented. It works by combining two GCounters, a positive one, that tracks increments, and a negative one, that tracks decrements. The final counter value is computed by subtracting the negative GCounter from the positive GCounter.
func NewPNCounter ¶
func NewPNCounter() *PNCounter
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the implementation of the Server server API for the CRDT service.
func (*Server) Handle ¶
func (s *Server) Handle(stream entity.Crdt_HandleServer) error
After invoking handle, the first message sent will always be a CrdtInit message, containing the entity ID, and, if it exists or is available, the current value of the entity. After that, one or more commands may be sent, as well as deltas as they arrive, and the entire value if either the entity is created, or the proxy wishes the user function to replace its entire value. The user function must respond with one reply per command in. They do not necessarily have to be sent in the same order that the commands were sent, the command ID is used to correlate commands to replies.
func (*Server) Register ¶
CrdtEntities can be registered to a server that handles crdt entities by a ServiceName. Whenever a internalCRDT.Server receives an CrdInit for an instance of a crdt entity identified by its EntityID and a ServiceName, the internalCRDT.Server handles such entities through their lifecycle. The handled entities value are captured by a context that is held fo each of them.
type ServiceName ¶
type ServiceName string
func (ServiceName) String ¶
func (sn ServiceName) String() string
type Vote ¶
type Vote struct {
// contains filtered or unexported fields
}
A Vote is a CRDT which allows nodes to vote on a condition. It’s similar to a GCounter, each node has its own counter, and an odd value is considered a vote for the condition, while an even value is considered a vote against. The result of the vote is decided by taking the votes of all nodes that are currently members of the cluster (when a node leave, its vote is discarded). Multiple decision strategies can be used to decide the result of the vote, such as at least one, majority and all.
func (*Vote) AtLeastOne ¶
AtLeastOne returns true if there is at least one voter for the condition.
func (*Vote) Majority ¶
Majority returns true if the number of votes for is more than half the number of voters.
func (*Vote) SelfVote ¶
SelfVote is the vote of the current node, which is included in Voters and VotesFor.