cluster

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2022 License: Apache-2.0 Imports: 28 Imported by: 0

README

Proto.Actor Cluster - Virtual Actors (Alpha)

Massively distributed actors for GO

Proto.Actor supports the classic actor model also found in Erlang and Akka.
Our cluster support however uses a different approach, Virtual Actor Model.

This is a model where each actor appears to always exist. There is no lifecycle as in the classic actor model. You get a reference to the actor by asking for it's ID.

e.g.

hello := shared.GetHelloGrain("abc")
res := hello.SayHello(&shared.HelloRequest{Name: "Proto.Actor"})

This will ask the cluster where the 'abc' actor is located. If it does not yet exist, it will be created for you.

See Microsoft Orleans for more info about the Virtual Actor Model: http://dotnet.github.io/orleans/

How to

Protobuf IDL Definition

Start by defining your messages and grain contracts. You do this by using Protobuf IDL files.

Here is the definition from the /examples/cluster/shared example

syntax = "proto3";
package shared;

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

message AddRequest {
  double a = 1;
  double b = 2;
}

message AddResponse {
  double result = 1;
}

service Hello {
  rpc SayHello (HelloRequest) returns (HelloResponse) {} 
  rpc Add(AddRequest) returns (AddResponse) {}
}

Once you have this, you can generate your code using the protobuf protoc compiler.

Windows

#generate messages
protoc -I=. -I=%GOPATH%\src --gogoslick_out=. protos.proto
#generate grains 
protoc -I=. -I=%GOPATH%\src --gorleans_out=. protos.proto 

Implementing

Once the messages and contracts have been generated, you can start implementing your own business logic. This is essentially a type which is powered by a Proto.Actor actor behind the scenes.

package shared

// a Go struct implementing the Hello interface
type hello struct {
}

func (*hello) SayHello(r *HelloRequest) *HelloResponse {
	return &HelloResponse{Message: "hello " + r.Name}
}

func (*hello) Add(r *AddRequest) *AddResponse {
	return &AddResponse{Result: r.A + r.B}
}

// Register what implementation Proto.Actor should use when 
// creating actors for a certain grain type.
func init() {
	// apply DI and setup logic
	HelloFactory(func() Hello { return &hello{} })
}

Seed nodes

func main() {
    cluster.Start("127.0.0.1:7711")
    console.ReadLine()
}

Member nodes

func main() {
	cluster.Start("127.0.0.1:0", "127.0.0.1:7711")

    // get a reference to the virtual actor called "abc" of type Hello
	hello := shared.GetHelloGrain("abc")
	res := hello.SayHello(&shared.HelloRequest{Name: "Proto.Actor"})
	log.Printf("Message from grain %v", res.Message)
}

FAQ

Can I use Proto.Actor Cluster in production?

The Proto.Actor Cluster support is in alpha version, thus not production ready.

What about performance?

Proto.Actor Remoting is able to pass 1 million+ messages per second on a standard dev machine. This is the same infrastructure used in Proto.Actor cluster. Proto.Actor Cluster however uses an RPC API, meaning it is Request/Response in nature. If you wait for a response for each call, the throughput will ofcourse be a lot less. Async Fire and forget for performance, Request/Response for simplicity.

Documentation

Index

Constants

View Source
const (
	TopologyKey   string = "topology"
	HearthbeatKey string = "heathbeat"
)
View Source
const DefaultGossipActorName string = "gossip"

Variables

View Source
var (
	IdentityHandoverAck_State_name = map[int32]string{
		0: "processed",
		1: "incorrect_topology",
	}
	IdentityHandoverAck_State_value = map[string]int32{
		"processed":          0,
		"incorrect_topology": 1,
	}
)

Enum value maps for IdentityHandoverAck_State.

View Source
var File_cluster_proto protoreflect.FileDescriptor
View Source
var File_gossip_proto protoreflect.FileDescriptor
View Source
var File_grain_proto protoreflect.FileDescriptor

Functions

func MembersToMap

func MembersToMap(members Members) map[string]*Member

func NewGossipConsensusHandler

func NewGossipConsensusHandler() *gossipConsensusHandler

func RemotePlacementActor

func RemotePlacementActor(address string) *actor.PID

RemotePlacementActor returns the PID of the remote placement actor

func SetClusterIdentity

func SetClusterIdentity(ctx actor.ExtensionContext, ci *ClusterIdentity)

func SetLogLevel

func SetLogLevel(level log.Level)

SetLogLevel sets the log level for the logger.

SetLogLevel is safe to call concurrently

func SortMembers

func SortMembers(members Members)

func TopologyHash

func TopologyHash(members Members) uint64

func WithClusterIdentity

func WithClusterIdentity(props *actor.Props, ci *ClusterIdentity) *actor.Props

Types

type ActivatedKind

type ActivatedKind struct {
	Kind     string
	Props    *actor.Props
	Strategy MemberStrategy
	// contains filtered or unexported fields
}

func (*ActivatedKind) Dev

func (ak *ActivatedKind) Dev()

func (*ActivatedKind) Inc

func (ak *ActivatedKind) Inc()

type Activation

type Activation struct {
	Pid             *actor.PID       `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"`
	ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"`
	// contains filtered or unexported fields
}

func (*Activation) Descriptor deprecated

func (*Activation) Descriptor() ([]byte, []int)

Deprecated: Use Activation.ProtoReflect.Descriptor instead.

func (*Activation) GetClusterIdentity

func (x *Activation) GetClusterIdentity() *ClusterIdentity

func (*Activation) GetPid

func (x *Activation) GetPid() *actor.PID

func (*Activation) ProtoMessage

func (*Activation) ProtoMessage()

func (*Activation) ProtoReflect

func (x *Activation) ProtoReflect() protoreflect.Message

func (*Activation) Reset

func (x *Activation) Reset()

func (*Activation) String

func (x *Activation) String() string

type ActivationRequest

type ActivationRequest struct {
	ClusterIdentity *ClusterIdentity `protobuf:"bytes,1,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"`
	RequestId       string           `protobuf:"bytes,2,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"`
	TopologyHash    uint64           `protobuf:"varint,3,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	// contains filtered or unexported fields
}

func (*ActivationRequest) Descriptor deprecated

func (*ActivationRequest) Descriptor() ([]byte, []int)

Deprecated: Use ActivationRequest.ProtoReflect.Descriptor instead.

func (*ActivationRequest) GetClusterIdentity

func (x *ActivationRequest) GetClusterIdentity() *ClusterIdentity

func (*ActivationRequest) GetRequestId

func (x *ActivationRequest) GetRequestId() string

func (*ActivationRequest) GetTopologyHash

func (x *ActivationRequest) GetTopologyHash() uint64

func (*ActivationRequest) ProtoMessage

func (*ActivationRequest) ProtoMessage()

func (*ActivationRequest) ProtoReflect

func (x *ActivationRequest) ProtoReflect() protoreflect.Message

func (*ActivationRequest) Reset

func (x *ActivationRequest) Reset()

func (*ActivationRequest) String

func (x *ActivationRequest) String() string

type ActivationResponse

type ActivationResponse struct {
	Pid          *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"`
	Failed       bool       `protobuf:"varint,2,opt,name=failed,proto3" json:"failed,omitempty"`
	TopologyHash uint64     `protobuf:"varint,3,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	// contains filtered or unexported fields
}

func (*ActivationResponse) Descriptor deprecated

func (*ActivationResponse) Descriptor() ([]byte, []int)

Deprecated: Use ActivationResponse.ProtoReflect.Descriptor instead.

func (*ActivationResponse) GetFailed

func (x *ActivationResponse) GetFailed() bool

func (*ActivationResponse) GetPid

func (x *ActivationResponse) GetPid() *actor.PID

func (*ActivationResponse) GetTopologyHash

func (x *ActivationResponse) GetTopologyHash() uint64

func (*ActivationResponse) ProtoMessage

func (*ActivationResponse) ProtoMessage()

func (*ActivationResponse) ProtoReflect

func (x *ActivationResponse) ProtoReflect() protoreflect.Message

func (*ActivationResponse) Reset

func (x *ActivationResponse) Reset()

func (*ActivationResponse) String

func (x *ActivationResponse) String() string

type ActivationTerminated

type ActivationTerminated struct {
	Pid             *actor.PID       `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"`
	ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"`
	// contains filtered or unexported fields
}

Terminated, removed from lookup

func (*ActivationTerminated) Descriptor deprecated

func (*ActivationTerminated) Descriptor() ([]byte, []int)

Deprecated: Use ActivationTerminated.ProtoReflect.Descriptor instead.

func (*ActivationTerminated) GetClusterIdentity

func (x *ActivationTerminated) GetClusterIdentity() *ClusterIdentity

func (*ActivationTerminated) GetPid

func (x *ActivationTerminated) GetPid() *actor.PID

func (*ActivationTerminated) ProtoMessage

func (*ActivationTerminated) ProtoMessage()

func (*ActivationTerminated) ProtoReflect

func (x *ActivationTerminated) ProtoReflect() protoreflect.Message

func (*ActivationTerminated) Reset

func (x *ActivationTerminated) Reset()

func (*ActivationTerminated) String

func (x *ActivationTerminated) String() string

type ActivationTerminating

type ActivationTerminating struct {
	Pid             *actor.PID       `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"`
	ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"`
	// contains filtered or unexported fields
}

Started terminating, not yet removed from IIdentityLookup

func (*ActivationTerminating) Descriptor deprecated

func (*ActivationTerminating) Descriptor() ([]byte, []int)

Deprecated: Use ActivationTerminating.ProtoReflect.Descriptor instead.

func (*ActivationTerminating) GetClusterIdentity

func (x *ActivationTerminating) GetClusterIdentity() *ClusterIdentity

func (*ActivationTerminating) GetPid

func (x *ActivationTerminating) GetPid() *actor.PID

func (*ActivationTerminating) ProtoMessage

func (*ActivationTerminating) ProtoMessage()

func (*ActivationTerminating) ProtoReflect

func (x *ActivationTerminating) ProtoReflect() protoreflect.Message

func (*ActivationTerminating) Reset

func (x *ActivationTerminating) Reset()

func (*ActivationTerminating) String

func (x *ActivationTerminating) String() string

type ActorStatistics

type ActorStatistics struct {
	ActorCount map[string]int64 `` /* 180-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*ActorStatistics) Descriptor deprecated

func (*ActorStatistics) Descriptor() ([]byte, []int)

Deprecated: Use ActorStatistics.ProtoReflect.Descriptor instead.

func (*ActorStatistics) GetActorCount

func (x *ActorStatistics) GetActorCount() map[string]int64

func (*ActorStatistics) ProtoMessage

func (*ActorStatistics) ProtoMessage()

func (*ActorStatistics) ProtoReflect

func (x *ActorStatistics) ProtoReflect() protoreflect.Message

func (*ActorStatistics) Reset

func (x *ActorStatistics) Reset()

func (*ActorStatistics) String

func (x *ActorStatistics) String() string

type AddConsensusCheck

type AddConsensusCheck struct {
	ID    string
	Check *ConsensusCheck
}

func NewAddConsensusCheck

func NewAddConsensusCheck(id string, check *ConsensusCheck) AddConsensusCheck

type Cluster

type Cluster struct {
	ActorSystem    *actor.ActorSystem
	Config         *Config
	Gossip         Gossiper
	Remote         *remote.Remote
	PidCache       *PidCacheValue
	MemberList     *MemberList
	IdentityLookup IdentityLookup
	// contains filtered or unexported fields
}

func GetCluster

func GetCluster(actorSystem *actor.ActorSystem) *Cluster

func New

func New(actorSystem *actor.ActorSystem, config *Config) *Cluster

func (*Cluster) Call

func (c *Cluster) Call(name string, kind string, msg interface{}, opts ...GrainCallOption) (interface{}, error)

Call is a wrap of context.RequestFuture with retries.

func (*Cluster) ExtensionID

func (c *Cluster) ExtensionID() extensions.ExtensionID

func (*Cluster) Get

func (c *Cluster) Get(identity string, kind string) *actor.PID

func (*Cluster) GetBlockedMembers

func (c *Cluster) GetBlockedMembers() set.Set[string]

func (*Cluster) GetClusterKind

func (c *Cluster) GetClusterKind(kind string) *ActivatedKind

func (*Cluster) GetClusterKinds

func (c *Cluster) GetClusterKinds() []string

func (*Cluster) Request

func (c *Cluster) Request(identity string, kind string, message interface{}) (interface{}, error)

func (*Cluster) Shutdown

func (c *Cluster) Shutdown(graceful bool)

func (*Cluster) StartClient

func (c *Cluster) StartClient()

func (*Cluster) StartMember

func (c *Cluster) StartMember()

func (*Cluster) TryGetClusterKind

func (c *Cluster) TryGetClusterKind(kind string) (*ActivatedKind, bool)

type ClusterContextConfig

type ClusterContextConfig struct {
	ActorRequestTimeout                          time.Duration
	RequestsLogThrottlePeriod                    time.Duration
	MaxNumberOfEventsInRequestLogThrottledPeriod int
	RetryAction                                  func(int) int
	// contains filtered or unexported fields
}

ClusterContextConfig is used to configure cluster context parameters

func NewDefaultClusterContextConfig

func NewDefaultClusterContextConfig() *ClusterContextConfig

NewDefaultClusterContextConfig creates a mew ClusterContextConfig with default values and returns a pointer to its memory address

type ClusterIdentity

type ClusterIdentity struct {
	Identity string `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"`
	Kind     string `protobuf:"bytes,2,opt,name=kind,proto3" json:"kind,omitempty"`
	// contains filtered or unexported fields
}

func GetClusterIdentity

func GetClusterIdentity(ctx actor.ExtensionContext) *ClusterIdentity

func NewClusterIdentity

func NewClusterIdentity(identity string, kind string) *ClusterIdentity

func (*ClusterIdentity) AsKey

func (ci *ClusterIdentity) AsKey() string

func (*ClusterIdentity) Descriptor deprecated

func (*ClusterIdentity) Descriptor() ([]byte, []int)

Deprecated: Use ClusterIdentity.ProtoReflect.Descriptor instead.

func (*ClusterIdentity) ExtensionID

func (ci *ClusterIdentity) ExtensionID() ctxext.ContextExtensionID

func (*ClusterIdentity) GetIdentity

func (x *ClusterIdentity) GetIdentity() string

func (*ClusterIdentity) GetKind

func (x *ClusterIdentity) GetKind() string

func (*ClusterIdentity) ProtoMessage

func (*ClusterIdentity) ProtoMessage()

func (*ClusterIdentity) ProtoReflect

func (x *ClusterIdentity) ProtoReflect() protoreflect.Message

func (*ClusterIdentity) Reset

func (x *ClusterIdentity) Reset()

func (*ClusterIdentity) String

func (x *ClusterIdentity) String() string

func (*ClusterIdentity) ToShortString

func (ci *ClusterIdentity) ToShortString() string

remove

type ClusterInit

type ClusterInit struct {
	Identity *ClusterIdentity
	Cluster  *Cluster
}

type ClusterProvider

type ClusterProvider interface {
	StartMember(cluster *Cluster) error
	StartClient(cluster *Cluster) error
	Shutdown(graceful bool) error
}

type ClusterTopology

type ClusterTopology struct {
	TopologyHash uint64    `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	Members      []*Member `protobuf:"bytes,2,rep,name=members,proto3" json:"members,omitempty"`
	Joined       []*Member `protobuf:"bytes,3,rep,name=joined,proto3" json:"joined,omitempty"`
	Left         []*Member `protobuf:"bytes,4,rep,name=left,proto3" json:"left,omitempty"`
	Blocked      []string  `protobuf:"bytes,5,rep,name=blocked,proto3" json:"blocked,omitempty"`
	// contains filtered or unexported fields
}

func (*ClusterTopology) Descriptor deprecated

func (*ClusterTopology) Descriptor() ([]byte, []int)

Deprecated: Use ClusterTopology.ProtoReflect.Descriptor instead.

func (*ClusterTopology) GetBlocked

func (x *ClusterTopology) GetBlocked() []string

func (*ClusterTopology) GetJoined

func (x *ClusterTopology) GetJoined() []*Member

func (*ClusterTopology) GetLeft

func (x *ClusterTopology) GetLeft() []*Member

func (*ClusterTopology) GetMembers

func (x *ClusterTopology) GetMembers() []*Member

func (*ClusterTopology) GetTopologyHash

func (x *ClusterTopology) GetTopologyHash() uint64

func (*ClusterTopology) ProtoMessage

func (*ClusterTopology) ProtoMessage()

func (*ClusterTopology) ProtoReflect

func (x *ClusterTopology) ProtoReflect() protoreflect.Message

func (*ClusterTopology) Reset

func (x *ClusterTopology) Reset()

func (*ClusterTopology) String

func (x *ClusterTopology) String() string

type ClusterTopologyNotification

type ClusterTopologyNotification struct {
	MemberId     string `protobuf:"bytes,1,opt,name=member_id,json=memberId,proto3" json:"member_id,omitempty"`
	TopologyHash uint32 `protobuf:"varint,2,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	LeaderId     string `protobuf:"bytes,3,opt,name=leader_id,json=leaderId,proto3" json:"leader_id,omitempty"`
	// contains filtered or unexported fields
}

func (*ClusterTopologyNotification) Descriptor deprecated

func (*ClusterTopologyNotification) Descriptor() ([]byte, []int)

Deprecated: Use ClusterTopologyNotification.ProtoReflect.Descriptor instead.

func (*ClusterTopologyNotification) GetLeaderId

func (x *ClusterTopologyNotification) GetLeaderId() string

func (*ClusterTopologyNotification) GetMemberId

func (x *ClusterTopologyNotification) GetMemberId() string

func (*ClusterTopologyNotification) GetTopologyHash

func (x *ClusterTopologyNotification) GetTopologyHash() uint32

func (*ClusterTopologyNotification) ProtoMessage

func (*ClusterTopologyNotification) ProtoMessage()

func (*ClusterTopologyNotification) ProtoReflect

func (*ClusterTopologyNotification) Reset

func (x *ClusterTopologyNotification) Reset()

func (*ClusterTopologyNotification) String

func (x *ClusterTopologyNotification) String() string

type Config

type Config struct {
	Name                                         string
	Address                                      string
	ClusterProvider                              ClusterProvider
	IdentityLookup                               IdentityLookup
	RemoteConfig                                 *remote.Config
	RequestTimeoutTime                           time.Duration
	RequestsLogThrottlePeriod                    time.Duration
	MaxNumberOfEventsInRequestLogThrottledPeriod int
	ClusterContextProducer                       ContextProducer
	MemberStrategyBuilder                        func(cluster *Cluster, kind string) MemberStrategy
	Kinds                                        map[string]*Kind
	TimeoutTime                                  time.Duration
	GossipInterval                               time.Duration
	GossipRequestTimeout                         time.Duration
	GossipFanOut                                 int
	GossipMaxSend                                int
}

func Configure

func Configure(clusterName string, clusterProvider ClusterProvider, identityLookup IdentityLookup, remoteConfig *remote.Config, options ...ConfigOption) *Config

func (*Config) ToClusterContextConfig

func (c *Config) ToClusterContextConfig() *ClusterContextConfig

ToClusterContextConfig converts this cluster Config Context parameters into a valid ClusterContextConfig value and returns a pointer to its memory

type ConfigOption

type ConfigOption func(config *Config)

func WithClusterContextProducer

func WithClusterContextProducer(producer ContextProducer) ConfigOption

WithClusterContextProducer sets the cluster context producer.

func WithKinds

func WithKinds(kinds ...*Kind) ConfigOption

func WithMaxNumberOfEventsInRequestLogThrottlePeriod

func WithMaxNumberOfEventsInRequestLogThrottlePeriod(maxNumber int) ConfigOption

WithMaxNumberOfEventsInRequestLogThrottlePeriod sets the max number of events in request log throttled period.

func WithRequestTimeout

func WithRequestTimeout(t time.Duration) ConfigOption

WithRequestTimeout sets the request timeout.

func WithRequestsLogThrottlePeriod

func WithRequestsLogThrottlePeriod(period time.Duration) ConfigOption

WithRequestsLogThrottlePeriod sets the requests log throttle period.

type ConsensusCheck

type ConsensusCheck struct {
	// contains filtered or unexported fields
}

data structure helpful to store consensus check information and behavior.

func NewConsensusCheck

func NewConsensusCheck(affectedKeys []string, check GossipUpdater) ConsensusCheck

creates a new ConsensusCheck value with the given data and return it back.

type ConsensusCheckBuilder

type ConsensusCheckBuilder struct {
	// contains filtered or unexported fields
}

func NewConsensusCheckBuilder

func NewConsensusCheckBuilder(key string, getValue func(*anypb.Any) interface{}) *ConsensusCheckBuilder

func (*ConsensusCheckBuilder) AffectedKeys

func (ccb *ConsensusCheckBuilder) AffectedKeys() []string

func (*ConsensusCheckBuilder) Build

Build builds a new ConsensusHandler and ConsensusCheck values and returns pointers to them

func (*ConsensusCheckBuilder) Check

func (*ConsensusCheckBuilder) HasConsensus

func (ccb *ConsensusCheckBuilder) HasConsensus(memberValues []*consensusMemberValue) (bool, uint64)

func (*ConsensusCheckBuilder) MapToValue

func (ccb *ConsensusCheckBuilder) MapToValue(valueTuple *consensusValue) func(string, *GossipMemberState) (string, string, uint64)

type ConsensusCheckDefinition

type ConsensusCheckDefinition interface {
	Check() *ConsensusCheck
	AffectedKeys() map[string]struct{}
}

type ConsensusChecker

type ConsensusChecker func(*GossipState, map[string]empty) (bool, interface{})

Customary type used to provide consensus check callbacks of any type note: this is equivalent to (for future go v1.18):

type ConsensusChecker[T] func(GossipState, map[string]empty) (bool, T)

type ConsensusChecks

type ConsensusChecks struct {
	// contains filtered or unexported fields
}

acts as a storage of pointers to ConsensusCheck stored by key.

func NewConsensusChecks

func NewConsensusChecks() *ConsensusChecks

creates a new ConsensusChecks value and returns a pointer to it.

func (*ConsensusChecks) Add

func (cc *ConsensusChecks) Add(id string, check *ConsensusCheck)

adds a new pointer to a ConsensusCheck value in the storage and registers its affected by keys index.

func (*ConsensusChecks) GetByUpdatedKey

func (cc *ConsensusChecks) GetByUpdatedKey(key string) []*ConsensusCheck

iterates over all the keys stored in the set (map[string]empty) found in the given key map and populates a slice of pointers to ConsensusCheck values that is returned as a set of ConsensusCheck updated by the given key.

func (*ConsensusChecks) GetByUpdatedKeys

func (cc *ConsensusChecks) GetByUpdatedKeys(keys []string) []*ConsensusCheck

iterate over all the keys stored in the set (map[string]empty) found in the given key maps and populates a slice of pointers to ConsensusCheck values that is returned as a set of ConsensusCheck updated by the given keys with removed duplicates on it (as it is a "set").

func (*ConsensusChecks) Remove

func (cc *ConsensusChecks) Remove(id string)

Remove removes the given ConsensusCheck identity from the storage and removes its affected by keys index if needed after cleaning.

type ConsensusHandler

type ConsensusHandler interface {
	GetID() string
	TryGetConsensus(context.Context) (interface{}, bool)
}

type Context

type Context interface {
	Request(identity string, kind string, message interface{}, timeout ...time.Duration) (interface{}, error)
}

Context is an interface any cluster context needs to implement

type ContextProducer

type ContextProducer func(*Cluster) Context

Defines a type to provide DefaultContext configurations / implementations.

type DefaultContext

type DefaultContext struct {
	// contains filtered or unexported fields
}

Defines a default cluster context hashBytes structure.

func (*DefaultContext) Request

func (dcc *DefaultContext) Request(identity, kind string, message interface{}, timeout ...time.Duration) (interface{}, error)

type GetGossipStateRequest

type GetGossipStateRequest struct {
	Key string
}

Used to query the GossipActor about a given key status

func NewGetGossipStateRequest

func NewGetGossipStateRequest(key string) GetGossipStateRequest

Create a new GetGossipStateRequest value and return it back

type GetGossipStateResponse

type GetGossipStateResponse struct {
	State map[string]*anypb.Any
}

Used by the GossipActor to send back the status value of a given key

func NewGetGossipStateResponse

func NewGetGossipStateResponse(state map[string]*anypb.Any) GetGossipStateResponse

type GetPid

type GetPid struct {
	ClusterIdentity *ClusterIdentity
}

GetPid contains

type Gossip

The Gossip interface must be implemented by any value that pretends to participate with-in the Gossip protocol

type GossipActor

type GossipActor struct {
	// contains filtered or unexported fields
}

Actor used to send gossip messages around

func NewGossipActor

func NewGossipActor(requestTimeout time.Duration, myID string, getBlockedMembers func() set.Set[string], fanOut int, maxSend int) *GossipActor

Creates a new GossipActor and returns a pointer to its location in the heap

func (*GossipActor) Receive

func (ga *GossipActor) Receive(ctx actor.Context)

Receive method.

func (*GossipActor) ReceiveState

func (ga *GossipActor) ReceiveState(remoteState *GossipState, ctx actor.Context)

type GossipConsensusChecker

type GossipConsensusChecker interface {
	AddConsensusCheck(id string, check *ConsensusCheck)
	RemoveConsensusCheck(id string)
}

This interface must be implemented by any value that wants to add or remove consensus checkers

type GossipCore

type GossipCore interface {
	UpdateClusterTopology(topology *ClusterTopology)
	ReceiveState(remoteState *GossipState) []*GossipUpdate
	SendState(sendStateToMember LocalStateSender)
	GetMemberStateDelta(targetMemberID string) *MemberStateDelta
}

This interface must be implemented by any value that wants to react to cluster topology events

type GossipDeltaValue

type GossipDeltaValue struct {
	Entries []*GossipDeltaValue_GossipDeltaEntry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries,omitempty"`
	// contains filtered or unexported fields
}

represents a value that can be sent in form of a delta change instead of a full value replace

func (*GossipDeltaValue) Descriptor deprecated

func (*GossipDeltaValue) Descriptor() ([]byte, []int)

Deprecated: Use GossipDeltaValue.ProtoReflect.Descriptor instead.

func (*GossipDeltaValue) GetEntries

func (*GossipDeltaValue) ProtoMessage

func (*GossipDeltaValue) ProtoMessage()

func (*GossipDeltaValue) ProtoReflect

func (x *GossipDeltaValue) ProtoReflect() protoreflect.Message

func (*GossipDeltaValue) Reset

func (x *GossipDeltaValue) Reset()

func (*GossipDeltaValue) String

func (x *GossipDeltaValue) String() string

type GossipDeltaValue_GossipDeltaEntry

type GossipDeltaValue_GossipDeltaEntry struct {
	SequenceNumber int64  `protobuf:"varint,1,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"`
	Data           []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
	// contains filtered or unexported fields
}

these are the entries of a delta value this can be seen as an array with data, where each element in the array is tagged with a sequence number

func (*GossipDeltaValue_GossipDeltaEntry) Descriptor deprecated

func (*GossipDeltaValue_GossipDeltaEntry) Descriptor() ([]byte, []int)

Deprecated: Use GossipDeltaValue_GossipDeltaEntry.ProtoReflect.Descriptor instead.

func (*GossipDeltaValue_GossipDeltaEntry) GetData

func (x *GossipDeltaValue_GossipDeltaEntry) GetData() []byte

func (*GossipDeltaValue_GossipDeltaEntry) GetSequenceNumber

func (x *GossipDeltaValue_GossipDeltaEntry) GetSequenceNumber() int64

func (*GossipDeltaValue_GossipDeltaEntry) ProtoMessage

func (*GossipDeltaValue_GossipDeltaEntry) ProtoMessage()

func (*GossipDeltaValue_GossipDeltaEntry) ProtoReflect

func (*GossipDeltaValue_GossipDeltaEntry) Reset

func (*GossipDeltaValue_GossipDeltaEntry) String

type GossipKeyValue

type GossipKeyValue struct {
	SequenceNumber                 int64      `protobuf:"varint,2,opt,name=sequence_number,json=sequenceNumber,proto3" json:"sequence_number,omitempty"` //version is local to the owner member
	Value                          *anypb.Any `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"`                                          //value is any format
	LocalTimestampUnixMilliseconds int64      ``                                                                                                         /* 156-byte string literal not displayed */
	// contains filtered or unexported fields
}

a known key might be heartbeat. if we locally tag each entry with a local timestamp this means that we can measure if we have not received a new heartbeat from one member in some time even if we don't know the exact time the heartbeat was issued, due to clock differences. we still know when _we_ as in this node, got this data. and we can measure time from then til now.

if we got a hear-beat from another node, and X seconds pass, we can assume it to be dead

func (*GossipKeyValue) Descriptor deprecated

func (*GossipKeyValue) Descriptor() ([]byte, []int)

Deprecated: Use GossipKeyValue.ProtoReflect.Descriptor instead.

func (*GossipKeyValue) GetLocalTimestampUnixMilliseconds

func (x *GossipKeyValue) GetLocalTimestampUnixMilliseconds() int64

func (*GossipKeyValue) GetSequenceNumber

func (x *GossipKeyValue) GetSequenceNumber() int64

func (*GossipKeyValue) GetValue

func (x *GossipKeyValue) GetValue() *anypb.Any

func (*GossipKeyValue) ProtoMessage

func (*GossipKeyValue) ProtoMessage()

func (*GossipKeyValue) ProtoReflect

func (x *GossipKeyValue) ProtoReflect() protoreflect.Message

func (*GossipKeyValue) Reset

func (x *GossipKeyValue) Reset()

func (*GossipKeyValue) String

func (x *GossipKeyValue) String() string

type GossipKeyValues

type GossipKeyValues = map[string]*GossipKeyValue

type GossipMemberState

type GossipMemberState = GossipState_GossipMemberState

convenience type alias

type GossipMemberStates

type GossipMemberStates = map[string]*GossipMemberState

type GossipRequest

type GossipRequest struct {
	MemberId string       `protobuf:"bytes,2,opt,name=member_id,json=memberId,proto3" json:"member_id,omitempty"`
	State    *GossipState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"`
	// contains filtered or unexported fields
}

func (*GossipRequest) Descriptor deprecated

func (*GossipRequest) Descriptor() ([]byte, []int)

Deprecated: Use GossipRequest.ProtoReflect.Descriptor instead.

func (*GossipRequest) GetMemberId

func (x *GossipRequest) GetMemberId() string

func (*GossipRequest) GetState

func (x *GossipRequest) GetState() *GossipState

func (*GossipRequest) ProtoMessage

func (*GossipRequest) ProtoMessage()

func (*GossipRequest) ProtoReflect

func (x *GossipRequest) ProtoReflect() protoreflect.Message

func (*GossipRequest) Reset

func (x *GossipRequest) Reset()

func (*GossipRequest) String

func (x *GossipRequest) String() string

type GossipResponse

type GossipResponse struct {
	State *GossipState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"`
	// contains filtered or unexported fields
}

Ack a gossip request

func (*GossipResponse) Descriptor deprecated

func (*GossipResponse) Descriptor() ([]byte, []int)

Deprecated: Use GossipResponse.ProtoReflect.Descriptor instead.

func (*GossipResponse) GetState

func (x *GossipResponse) GetState() *GossipState

func (*GossipResponse) ProtoMessage

func (*GossipResponse) ProtoMessage()

func (*GossipResponse) ProtoReflect

func (x *GossipResponse) ProtoReflect() protoreflect.Message

func (*GossipResponse) Reset

func (x *GossipResponse) Reset()

func (*GossipResponse) String

func (x *GossipResponse) String() string

type GossipResponseAck

type GossipResponseAck struct {
	// contains filtered or unexported fields
}

func (*GossipResponseAck) Descriptor deprecated

func (*GossipResponseAck) Descriptor() ([]byte, []int)

Deprecated: Use GossipResponseAck.ProtoReflect.Descriptor instead.

func (*GossipResponseAck) ProtoMessage

func (*GossipResponseAck) ProtoMessage()

func (*GossipResponseAck) ProtoReflect

func (x *GossipResponseAck) ProtoReflect() protoreflect.Message

func (*GossipResponseAck) Reset

func (x *GossipResponseAck) Reset()

func (*GossipResponseAck) String

func (x *GossipResponseAck) String() string

type GossipState

type GossipState struct {
	Members map[string]*GossipState_GossipMemberState `` /* 155-byte string literal not displayed */
	// contains filtered or unexported fields
}

two GossipState objects can be merged key + member_id gets it's own entry, if collision, highest version is selected

func (*GossipState) Descriptor deprecated

func (*GossipState) Descriptor() ([]byte, []int)

Deprecated: Use GossipState.ProtoReflect.Descriptor instead.

func (*GossipState) GetMembers

func (x *GossipState) GetMembers() map[string]*GossipState_GossipMemberState

func (*GossipState) ProtoMessage

func (*GossipState) ProtoMessage()

func (*GossipState) ProtoReflect

func (x *GossipState) ProtoReflect() protoreflect.Message

func (*GossipState) Reset

func (x *GossipState) Reset()

func (*GossipState) String

func (x *GossipState) String() string

type GossipStateStorer

type GossipStateStorer interface {
	GetState(key string) map[string]*anypb.Any
	SetState(key string, value proto.Message)
}

This interface must be implemented by any value that. wants to be used as a gossip state storage

type GossipState_GossipMemberState

type GossipState_GossipMemberState struct {
	Values map[string]*GossipKeyValue `` /* 153-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*GossipState_GossipMemberState) Descriptor deprecated

func (*GossipState_GossipMemberState) Descriptor() ([]byte, []int)

Deprecated: Use GossipState_GossipMemberState.ProtoReflect.Descriptor instead.

func (*GossipState_GossipMemberState) GetValues

func (*GossipState_GossipMemberState) ProtoMessage

func (*GossipState_GossipMemberState) ProtoMessage()

func (*GossipState_GossipMemberState) ProtoReflect

func (*GossipState_GossipMemberState) Reset

func (x *GossipState_GossipMemberState) Reset()

func (*GossipState_GossipMemberState) String

type GossipUpdate

type GossipUpdate struct {
	MemberID, Key string
	Value         *anypb.Any
	SeqNumber     int64
}

Used to update gossip data when a Clustertopology event occurs

type GossipUpdater

type GossipUpdater func(*GossipState, map[string]empty)

type Gossiper

type Gossiper struct {
	// The Gossiper Actor Name, defaults to "gossip"
	GossipActorName string
	// contains filtered or unexported fields
}

The Gossiper data structure manages Gossip

func (*Gossiper) GetState

func (g *Gossiper) GetState(key string) (map[string]*anypb.Any, error)

func (*Gossiper) RegisterConsensusCheck

func (g *Gossiper) RegisterConsensusCheck(key string, getValue func(*anypb.Any) interface{}) ConsensusHandler

Builds a consensus handler and a consensus checker, send the checker to the Gossip actor and returns the handler back to the caller

func (*Gossiper) SendState

func (g *Gossiper) SendState()

func (*Gossiper) SetState

func (g *Gossiper) SetState(key string, value proto.Message)

Sends fire and forget message to update member state

func (*Gossiper) SetStateRequest

func (g *Gossiper) SetStateRequest(key string, value proto.Message) error

Sends a Request (that blocks) to update member state

func (*Gossiper) Shutdown

func (g *Gossiper) Shutdown()

func (*Gossiper) StartGossiping

func (g *Gossiper) StartGossiping() error

type GrainCallConfig

type GrainCallConfig struct {
	RetryCount  int
	Timeout     time.Duration
	RetryAction func(n int)
	Context     actor.SenderContext
}

func DefaultGrainCallConfig

func DefaultGrainCallConfig(cluster *Cluster) *GrainCallConfig

func NewGrainCallOptions

func NewGrainCallOptions(cluster *Cluster) *GrainCallConfig

type GrainCallOption

type GrainCallOption func(config *GrainCallConfig)

func WithContext

func WithContext(ctx actor.SenderContext) GrainCallOption

func WithRetry

func WithRetry(count int) GrainCallOption

func WithRetryAction

func WithRetryAction(act func(i int)) GrainCallOption

func WithTimeout

func WithTimeout(timeout time.Duration) GrainCallOption

type GrainContext

type GrainContext interface {
	actor.Context

	Identity() string
	Kind() string
	Cluster() *Cluster
}

func NewGrainContext

func NewGrainContext(context actor.Context, identity *ClusterIdentity, cluster *Cluster) GrainContext

type GrainErrorResponse

type GrainErrorResponse struct {
	Err string `protobuf:"bytes,1,opt,name=err,proto3" json:"err,omitempty"`
	// contains filtered or unexported fields
}

func (*GrainErrorResponse) Descriptor deprecated

func (*GrainErrorResponse) Descriptor() ([]byte, []int)

Deprecated: Use GrainErrorResponse.ProtoReflect.Descriptor instead.

func (*GrainErrorResponse) GetErr

func (x *GrainErrorResponse) GetErr() string

func (*GrainErrorResponse) ProtoMessage

func (*GrainErrorResponse) ProtoMessage()

func (*GrainErrorResponse) ProtoReflect

func (x *GrainErrorResponse) ProtoReflect() protoreflect.Message

func (*GrainErrorResponse) Reset

func (x *GrainErrorResponse) Reset()

func (*GrainErrorResponse) String

func (x *GrainErrorResponse) String() string

type GrainRequest

type GrainRequest struct {
	MethodIndex     int32  `protobuf:"varint,1,opt,name=method_index,json=methodIndex,proto3" json:"method_index,omitempty"`
	MessageData     []byte `protobuf:"bytes,2,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"`
	MessageTypeName string `protobuf:"bytes,3,opt,name=message_type_name,json=messageTypeName,proto3" json:"message_type_name,omitempty"`
	// contains filtered or unexported fields
}

func (*GrainRequest) Descriptor deprecated

func (*GrainRequest) Descriptor() ([]byte, []int)

Deprecated: Use GrainRequest.ProtoReflect.Descriptor instead.

func (*GrainRequest) GetMessageData

func (x *GrainRequest) GetMessageData() []byte

func (*GrainRequest) GetMessageTypeName

func (x *GrainRequest) GetMessageTypeName() string

func (*GrainRequest) GetMethodIndex

func (x *GrainRequest) GetMethodIndex() int32

func (*GrainRequest) ProtoMessage

func (*GrainRequest) ProtoMessage()

func (*GrainRequest) ProtoReflect

func (x *GrainRequest) ProtoReflect() protoreflect.Message

func (*GrainRequest) Reset

func (x *GrainRequest) Reset()

func (*GrainRequest) String

func (x *GrainRequest) String() string

type GrainResponse

type GrainResponse struct {
	MessageData     []byte `protobuf:"bytes,1,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"`
	MessageTypeName string `protobuf:"bytes,2,opt,name=message_type_name,json=messageTypeName,proto3" json:"message_type_name,omitempty"`
	// contains filtered or unexported fields
}

func (*GrainResponse) Descriptor deprecated

func (*GrainResponse) Descriptor() ([]byte, []int)

Deprecated: Use GrainResponse.ProtoReflect.Descriptor instead.

func (*GrainResponse) GetMessageData

func (x *GrainResponse) GetMessageData() []byte

func (*GrainResponse) GetMessageTypeName

func (x *GrainResponse) GetMessageTypeName() string

func (*GrainResponse) ProtoMessage

func (*GrainResponse) ProtoMessage()

func (*GrainResponse) ProtoReflect

func (x *GrainResponse) ProtoReflect() protoreflect.Message

func (*GrainResponse) Reset

func (x *GrainResponse) Reset()

func (*GrainResponse) String

func (x *GrainResponse) String() string

type IdentityHandover

type IdentityHandover struct {
	Actors       []*Activation `protobuf:"bytes,1,rep,name=actors,proto3" json:"actors,omitempty"`
	ChunkId      int32         `protobuf:"varint,2,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"`
	Final        bool          `protobuf:"varint,3,opt,name=final,proto3" json:"final,omitempty"`
	TopologyHash uint64        `protobuf:"varint,4,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	Skipped      int32         `protobuf:"varint,5,opt,name=skipped,proto3" json:"skipped,omitempty"` // Total number of activations skipped
	Sent         int32         `protobuf:"varint,6,opt,name=sent,proto3" json:"sent,omitempty"`       // Total number of activations sent
	// contains filtered or unexported fields
}

func (*IdentityHandover) Descriptor deprecated

func (*IdentityHandover) Descriptor() ([]byte, []int)

Deprecated: Use IdentityHandover.ProtoReflect.Descriptor instead.

func (*IdentityHandover) GetActors

func (x *IdentityHandover) GetActors() []*Activation

func (*IdentityHandover) GetChunkId

func (x *IdentityHandover) GetChunkId() int32

func (*IdentityHandover) GetFinal

func (x *IdentityHandover) GetFinal() bool

func (*IdentityHandover) GetSent

func (x *IdentityHandover) GetSent() int32

func (*IdentityHandover) GetSkipped

func (x *IdentityHandover) GetSkipped() int32

func (*IdentityHandover) GetTopologyHash

func (x *IdentityHandover) GetTopologyHash() uint64

func (*IdentityHandover) ProtoMessage

func (*IdentityHandover) ProtoMessage()

func (*IdentityHandover) ProtoReflect

func (x *IdentityHandover) ProtoReflect() protoreflect.Message

func (*IdentityHandover) Reset

func (x *IdentityHandover) Reset()

func (*IdentityHandover) String

func (x *IdentityHandover) String() string

type IdentityHandoverAck

type IdentityHandoverAck struct {
	ChunkId         int32                     `protobuf:"varint,1,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"`
	TopologyHash    uint64                    `protobuf:"varint,2,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	ProcessingState IdentityHandoverAck_State `` /* 146-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*IdentityHandoverAck) Descriptor deprecated

func (*IdentityHandoverAck) Descriptor() ([]byte, []int)

Deprecated: Use IdentityHandoverAck.ProtoReflect.Descriptor instead.

func (*IdentityHandoverAck) GetChunkId

func (x *IdentityHandoverAck) GetChunkId() int32

func (*IdentityHandoverAck) GetProcessingState

func (x *IdentityHandoverAck) GetProcessingState() IdentityHandoverAck_State

func (*IdentityHandoverAck) GetTopologyHash

func (x *IdentityHandoverAck) GetTopologyHash() uint64

func (*IdentityHandoverAck) ProtoMessage

func (*IdentityHandoverAck) ProtoMessage()

func (*IdentityHandoverAck) ProtoReflect

func (x *IdentityHandoverAck) ProtoReflect() protoreflect.Message

func (*IdentityHandoverAck) Reset

func (x *IdentityHandoverAck) Reset()

func (*IdentityHandoverAck) String

func (x *IdentityHandoverAck) String() string

type IdentityHandoverAck_State

type IdentityHandoverAck_State int32
const (
	IdentityHandoverAck_processed          IdentityHandoverAck_State = 0
	IdentityHandoverAck_incorrect_topology IdentityHandoverAck_State = 1
)

func (IdentityHandoverAck_State) Descriptor

func (IdentityHandoverAck_State) Enum

func (IdentityHandoverAck_State) EnumDescriptor deprecated

func (IdentityHandoverAck_State) EnumDescriptor() ([]byte, []int)

Deprecated: Use IdentityHandoverAck_State.Descriptor instead.

func (IdentityHandoverAck_State) Number

func (IdentityHandoverAck_State) String

func (x IdentityHandoverAck_State) String() string

func (IdentityHandoverAck_State) Type

type IdentityHandoverRequest

type IdentityHandoverRequest struct {
	CurrentTopology *IdentityHandoverRequest_Topology `protobuf:"bytes,1,opt,name=current_topology,json=currentTopology,proto3" json:"current_topology,omitempty"`
	Address         string                            `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"`
	// If the requester passes a delta topology, only return activations which would not be assigned to the member
	// in the previous topology.
	DeltaTopology *IdentityHandoverRequest_Topology `protobuf:"bytes,3,opt,name=delta_topology,json=deltaTopology,proto3" json:"delta_topology,omitempty"`
	// contains filtered or unexported fields
}

request response call from Identity actor sent to each member asking what activations they hold that belong to the requester

func (*IdentityHandoverRequest) Descriptor deprecated

func (*IdentityHandoverRequest) Descriptor() ([]byte, []int)

Deprecated: Use IdentityHandoverRequest.ProtoReflect.Descriptor instead.

func (*IdentityHandoverRequest) GetAddress

func (x *IdentityHandoverRequest) GetAddress() string

func (*IdentityHandoverRequest) GetCurrentTopology

func (*IdentityHandoverRequest) GetDeltaTopology

func (*IdentityHandoverRequest) ProtoMessage

func (*IdentityHandoverRequest) ProtoMessage()

func (*IdentityHandoverRequest) ProtoReflect

func (x *IdentityHandoverRequest) ProtoReflect() protoreflect.Message

func (*IdentityHandoverRequest) Reset

func (x *IdentityHandoverRequest) Reset()

func (*IdentityHandoverRequest) String

func (x *IdentityHandoverRequest) String() string

type IdentityHandoverRequest_Topology

type IdentityHandoverRequest_Topology struct {
	TopologyHash uint64    `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	Members      []*Member `protobuf:"bytes,3,rep,name=members,proto3" json:"members,omitempty"`
	// contains filtered or unexported fields
}

func (*IdentityHandoverRequest_Topology) Descriptor deprecated

func (*IdentityHandoverRequest_Topology) Descriptor() ([]byte, []int)

Deprecated: Use IdentityHandoverRequest_Topology.ProtoReflect.Descriptor instead.

func (*IdentityHandoverRequest_Topology) GetMembers

func (x *IdentityHandoverRequest_Topology) GetMembers() []*Member

func (*IdentityHandoverRequest_Topology) GetTopologyHash

func (x *IdentityHandoverRequest_Topology) GetTopologyHash() uint64

func (*IdentityHandoverRequest_Topology) ProtoMessage

func (*IdentityHandoverRequest_Topology) ProtoMessage()

func (*IdentityHandoverRequest_Topology) ProtoReflect

func (*IdentityHandoverRequest_Topology) Reset

func (*IdentityHandoverRequest_Topology) String

type IdentityLookup

type IdentityLookup interface {
	Get(clusterIdentity *ClusterIdentity) *actor.PID

	RemovePid(clusterIdentity *ClusterIdentity, pid *actor.PID)

	Setup(cluster *Cluster, kinds []string, isClient bool)

	Shutdown()
}

IdentityLookup contains

type IdentityStorageLookup

type IdentityStorageLookup struct {
	Storage StorageLookup
	// contains filtered or unexported fields
}

IdentityStorageLookup contains

func (*IdentityStorageLookup) Get

func (id *IdentityStorageLookup) Get(clusterIdentity *ClusterIdentity) *actor.PID

Get returns a PID for a given ClusterIdentity

func (*IdentityStorageLookup) RemoveMember

func (i *IdentityStorageLookup) RemoveMember(memberID string)

RemoveMember from identity storage

func (*IdentityStorageLookup) Setup

func (id *IdentityStorageLookup) Setup(cluster *Cluster, kinds []string, isClient bool)

type IdentityStorageWorker

type IdentityStorageWorker struct {
	// contains filtered or unexported fields
}

func (*IdentityStorageWorker) Receive

func (ids *IdentityStorageWorker) Receive(c actor.Context)

Receive func

type Informer

type Informer struct {
	// contains filtered or unexported fields
}

The Informer data structure implements the Gossip interface

func (*Informer) AddConsensusCheck

func (inf *Informer) AddConsensusCheck(id string, check *ConsensusCheck)

adds a new consensus checker to this informer

func (*Informer) CheckConsensus

func (inf *Informer) CheckConsensus(updatedKeys ...string)

check consensus for the given keys

func (*Informer) GetMemberStateDelta

func (inf *Informer) GetMemberStateDelta(targetMemberID string) *MemberStateDelta

func (*Informer) GetState

func (inf *Informer) GetState(key string) map[string]*anypb.Any

retrieves this informer current state for the given key returns map containing each known member id and their value

func (*Informer) ReceiveState

func (inf *Informer) ReceiveState(remoteState *GossipState) []*GossipUpdate

receives a remote informer state

func (*Informer) RemoveConsensusCheck

func (inf *Informer) RemoveConsensusCheck(id string)

removes a consensus checker from this informer

func (*Informer) SendState

func (inf *Informer) SendState(sendStateToMember LocalStateSender)

sends this informer local state to remote informers chosen randomly from the slice of other members known by this informer until gossipFanOut number of sent has been reached

func (*Informer) SetState

func (inf *Informer) SetState(key string, message proto.Message)

sets new update key state using the given proto message

func (*Informer) UpdateClusterTopology

func (inf *Informer) UpdateClusterTopology(topology *ClusterTopology)

called when there is a cluster topology update

type Kind

type Kind struct {
	Kind            string
	Props           *actor.Props
	StrategyBuilder func(*Cluster) MemberStrategy
}

Kind represents the kinds of actors a cluster can manage

func NewKind

func NewKind(kind string, props *actor.Props) *Kind

NewKind creates a new instance of a kind

func (*Kind) Build

func (k *Kind) Build(cluster *Cluster) *ActivatedKind

func (*Kind) WithMemberStrategy

func (k *Kind) WithMemberStrategy(strategyBuilder func(*Cluster) MemberStrategy)

type LocalStateSender

type LocalStateSender func(memberStateDelta *MemberStateDelta, member *Member)

customary type that defines a states sender callback.

type Member

type Member struct {
	Host  string   `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
	Port  int32    `protobuf:"varint,2,opt,name=port,proto3" json:"port,omitempty"`
	Id    string   `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"`
	Kinds []string `protobuf:"bytes,4,rep,name=kinds,proto3" json:"kinds,omitempty"`
	// contains filtered or unexported fields
}

func (*Member) Address

func (m *Member) Address() string

Address return a "host:port". Member defined by protos.proto

func (*Member) Descriptor deprecated

func (*Member) Descriptor() ([]byte, []int)

Deprecated: Use Member.ProtoReflect.Descriptor instead.

func (*Member) GetHost

func (x *Member) GetHost() string

func (*Member) GetId

func (x *Member) GetId() string

func (*Member) GetKinds

func (x *Member) GetKinds() []string

func (*Member) GetPort

func (x *Member) GetPort() int32

func (*Member) HasKind

func (m *Member) HasKind(kind string) bool

func (*Member) ProtoMessage

func (*Member) ProtoMessage()

func (*Member) ProtoReflect

func (x *Member) ProtoReflect() protoreflect.Message

func (*Member) Reset

func (x *Member) Reset()

func (*Member) String

func (x *Member) String() string

type MemberAvailableEvent

type MemberAvailableEvent struct {
	MemberMeta
}

func (*MemberAvailableEvent) MemberStatusEvent

func (*MemberAvailableEvent) MemberStatusEvent()

type MemberHeartbeat

type MemberHeartbeat struct {
	ActorStatistics *ActorStatistics `protobuf:"bytes,1,opt,name=actor_statistics,json=actorStatistics,proto3" json:"actor_statistics,omitempty"`
	// contains filtered or unexported fields
}

func (*MemberHeartbeat) Descriptor deprecated

func (*MemberHeartbeat) Descriptor() ([]byte, []int)

Deprecated: Use MemberHeartbeat.ProtoReflect.Descriptor instead.

func (*MemberHeartbeat) GetActorStatistics

func (x *MemberHeartbeat) GetActorStatistics() *ActorStatistics

func (*MemberHeartbeat) ProtoMessage

func (*MemberHeartbeat) ProtoMessage()

func (*MemberHeartbeat) ProtoReflect

func (x *MemberHeartbeat) ProtoReflect() protoreflect.Message

func (*MemberHeartbeat) Reset

func (x *MemberHeartbeat) Reset()

func (*MemberHeartbeat) String

func (x *MemberHeartbeat) String() string

type MemberJoinedEvent

type MemberJoinedEvent struct {
	MemberMeta
}

func (*MemberJoinedEvent) MemberStatusEvent

func (*MemberJoinedEvent) MemberStatusEvent()

type MemberLeftEvent

type MemberLeftEvent struct {
	MemberMeta
}

func (*MemberLeftEvent) MemberStatusEvent

func (*MemberLeftEvent) MemberStatusEvent()

type MemberList

type MemberList struct {
	// contains filtered or unexported fields
}

MemberList is responsible to keep track of the current cluster topology it does so by listening to changes from the ClusterProvider. the default ClusterProvider is consul.ConsulProvider which uses the Consul HTTP API to scan for changes

func NewMemberList

func NewMemberList(cluster *Cluster) *MemberList

func (*MemberList) BroadcastEvent

func (ml *MemberList) BroadcastEvent(message interface{}, includeSelf bool)

func (*MemberList) ContainsMemberID

func (ml *MemberList) ContainsMemberID(memberID string) bool

func (*MemberList) GetActivatorMember

func (ml *MemberList) GetActivatorMember(kind string, requestSourceAddress string) string

func (*MemberList) InitializeTopologyConsensus

func (ml *MemberList) InitializeTopologyConsensus()

func (*MemberList) Length

func (ml *MemberList) Length() int

func (*MemberList) Members

func (ml *MemberList) Members() *MemberSet

func (*MemberList) TerminateMember

func (ml *MemberList) TerminateMember(m *Member)

func (*MemberList) TopologyConsensus

func (ml *MemberList) TopologyConsensus(ctx context.Context) (uint64, bool)

func (*MemberList) UpdateClusterTopology

func (ml *MemberList) UpdateClusterTopology(members Members)

type MemberMeta

type MemberMeta struct {
	Host  string
	Port  int
	Kinds []string
}

func (*MemberMeta) GetKinds

func (e *MemberMeta) GetKinds() []string

func (*MemberMeta) Name

func (e *MemberMeta) Name() string

type MemberRejoinedEvent

type MemberRejoinedEvent struct {
	MemberMeta
}

func (*MemberRejoinedEvent) MemberStatusEvent

func (*MemberRejoinedEvent) MemberStatusEvent()

type MemberSet

type MemberSet struct {
	// contains filtered or unexported fields
}

func NewMemberSet

func NewMemberSet(members Members) *MemberSet

func (*MemberSet) ContainsID

func (ms *MemberSet) ContainsID(id string) bool

func (*MemberSet) Equals

func (ms *MemberSet) Equals(other *MemberSet) bool

func (*MemberSet) Except

func (ms *MemberSet) Except(other *MemberSet) *MemberSet

func (*MemberSet) ExceptIds

func (ms *MemberSet) ExceptIds(ids []string) *MemberSet

func (*MemberSet) GetMemberById

func (ms *MemberSet) GetMemberById(id string) *Member

func (*MemberSet) Len

func (ms *MemberSet) Len() int

func (*MemberSet) Members

func (ms *MemberSet) Members() Members

func (*MemberSet) TopologyHash

func (ms *MemberSet) TopologyHash() uint64

func (*MemberSet) Union

func (ms *MemberSet) Union(other *MemberSet) *MemberSet

type MemberStateDelta

type MemberStateDelta struct {
	TargetMemberID string
	HasState       bool
	State          *GossipState
	CommitOffsets  func()
}

type MemberStatus

type MemberStatus struct {
	Member
	MemberID string // for compatibility
	Alive    bool
}

func (*MemberStatus) Address

func (m *MemberStatus) Address() string

type MemberStatusEvent

type MemberStatusEvent interface {
	MemberStatusEvent()
	GetKinds() []string
}

type MemberStrategy

type MemberStrategy interface {
	GetAllMembers() Members
	AddMember(member *Member)
	RemoveMember(member *Member)
	GetPartition(key string) string
	GetActivator(senderAddress string) string
}

type MemberUnavailableEvent

type MemberUnavailableEvent struct {
	MemberMeta
}

func (*MemberUnavailableEvent) MemberStatusEvent

func (*MemberUnavailableEvent) MemberStatusEvent()

type Members

type Members []*Member

func CopySortMembers

func CopySortMembers(members Members) Members

func (*Members) ToSet

func (m *Members) ToSet() *MemberSet

type Option

type Option func(g *Gossiper)

type PackedActivations

type PackedActivations struct {
	Address string                    `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"`
	Actors  []*PackedActivations_Kind `protobuf:"bytes,2,rep,name=actors,proto3" json:"actors,omitempty"`
	// contains filtered or unexported fields
}

func (*PackedActivations) Descriptor deprecated

func (*PackedActivations) Descriptor() ([]byte, []int)

Deprecated: Use PackedActivations.ProtoReflect.Descriptor instead.

func (*PackedActivations) GetActors

func (x *PackedActivations) GetActors() []*PackedActivations_Kind

func (*PackedActivations) GetAddress

func (x *PackedActivations) GetAddress() string

func (*PackedActivations) ProtoMessage

func (*PackedActivations) ProtoMessage()

func (*PackedActivations) ProtoReflect

func (x *PackedActivations) ProtoReflect() protoreflect.Message

func (*PackedActivations) Reset

func (x *PackedActivations) Reset()

func (*PackedActivations) String

func (x *PackedActivations) String() string

type PackedActivations_Activation

type PackedActivations_Activation struct {
	Identity     string `protobuf:"bytes,1,opt,name=identity,proto3" json:"identity,omitempty"`
	ActivationId string `protobuf:"bytes,2,opt,name=activation_id,json=activationId,proto3" json:"activation_id,omitempty"`
	// contains filtered or unexported fields
}

func (*PackedActivations_Activation) Descriptor deprecated

func (*PackedActivations_Activation) Descriptor() ([]byte, []int)

Deprecated: Use PackedActivations_Activation.ProtoReflect.Descriptor instead.

func (*PackedActivations_Activation) GetActivationId

func (x *PackedActivations_Activation) GetActivationId() string

func (*PackedActivations_Activation) GetIdentity

func (x *PackedActivations_Activation) GetIdentity() string

func (*PackedActivations_Activation) ProtoMessage

func (*PackedActivations_Activation) ProtoMessage()

func (*PackedActivations_Activation) ProtoReflect

func (*PackedActivations_Activation) Reset

func (x *PackedActivations_Activation) Reset()

func (*PackedActivations_Activation) String

type PackedActivations_Kind

type PackedActivations_Kind struct {
	Name        string                          `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Activations []*PackedActivations_Activation `protobuf:"bytes,2,rep,name=activations,proto3" json:"activations,omitempty"`
	// contains filtered or unexported fields
}

func (*PackedActivations_Kind) Descriptor deprecated

func (*PackedActivations_Kind) Descriptor() ([]byte, []int)

Deprecated: Use PackedActivations_Kind.ProtoReflect.Descriptor instead.

func (*PackedActivations_Kind) GetActivations

func (*PackedActivations_Kind) GetName

func (x *PackedActivations_Kind) GetName() string

func (*PackedActivations_Kind) ProtoMessage

func (*PackedActivations_Kind) ProtoMessage()

func (*PackedActivations_Kind) ProtoReflect

func (x *PackedActivations_Kind) ProtoReflect() protoreflect.Message

func (*PackedActivations_Kind) Reset

func (x *PackedActivations_Kind) Reset()

func (*PackedActivations_Kind) String

func (x *PackedActivations_Kind) String() string

type PidCacheValue

type PidCacheValue struct {
	// contains filtered or unexported fields
}

func NewPidCache

func NewPidCache() *PidCacheValue

func (*PidCacheValue) Get

func (c *PidCacheValue) Get(identity string, kind string) (*actor.PID, bool)

func (*PidCacheValue) Remove

func (c *PidCacheValue) Remove(identity string, kind string)

func (*PidCacheValue) RemoveByMember

func (c *PidCacheValue) RemoveByMember(member *Member)

func (*PidCacheValue) RemoveByValue

func (c *PidCacheValue) RemoveByValue(identity string, kind string, pid *actor.PID)

func (*PidCacheValue) Set

func (c *PidCacheValue) Set(identity string, kind string, pid *actor.PID)

type PidResult

type PidResult struct {
	Pid *actor.PID
}

PidResult contains

type ProxyActivationRequest

type ProxyActivationRequest struct {
	ClusterIdentity    *ClusterIdentity `protobuf:"bytes,1,opt,name=cluster_identity,json=clusterIdentity,proto3" json:"cluster_identity,omitempty"`
	ReplacedActivation *actor.PID       `protobuf:"bytes,2,opt,name=replaced_activation,json=replacedActivation,proto3" json:"replaced_activation,omitempty"`
	// contains filtered or unexported fields
}

func (*ProxyActivationRequest) Descriptor deprecated

func (*ProxyActivationRequest) Descriptor() ([]byte, []int)

Deprecated: Use ProxyActivationRequest.ProtoReflect.Descriptor instead.

func (*ProxyActivationRequest) GetClusterIdentity

func (x *ProxyActivationRequest) GetClusterIdentity() *ClusterIdentity

func (*ProxyActivationRequest) GetReplacedActivation

func (x *ProxyActivationRequest) GetReplacedActivation() *actor.PID

func (*ProxyActivationRequest) ProtoMessage

func (*ProxyActivationRequest) ProtoMessage()

func (*ProxyActivationRequest) ProtoReflect

func (x *ProxyActivationRequest) ProtoReflect() protoreflect.Message

func (*ProxyActivationRequest) Reset

func (x *ProxyActivationRequest) Reset()

func (*ProxyActivationRequest) String

func (x *ProxyActivationRequest) String() string

type ReadyForRebalance

type ReadyForRebalance struct {
	TopologyHash uint64 `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	// contains filtered or unexported fields
}

func (*ReadyForRebalance) Descriptor deprecated

func (*ReadyForRebalance) Descriptor() ([]byte, []int)

Deprecated: Use ReadyForRebalance.ProtoReflect.Descriptor instead.

func (*ReadyForRebalance) GetTopologyHash

func (x *ReadyForRebalance) GetTopologyHash() uint64

func (*ReadyForRebalance) ProtoMessage

func (*ReadyForRebalance) ProtoMessage()

func (*ReadyForRebalance) ProtoReflect

func (x *ReadyForRebalance) ProtoReflect() protoreflect.Message

func (*ReadyForRebalance) Reset

func (x *ReadyForRebalance) Reset()

func (*ReadyForRebalance) String

func (x *ReadyForRebalance) String() string

type RebalanceCompleted

type RebalanceCompleted struct {
	TopologyHash uint64 `protobuf:"varint,1,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	// contains filtered or unexported fields
}

func (*RebalanceCompleted) Descriptor deprecated

func (*RebalanceCompleted) Descriptor() ([]byte, []int)

Deprecated: Use RebalanceCompleted.ProtoReflect.Descriptor instead.

func (*RebalanceCompleted) GetTopologyHash

func (x *RebalanceCompleted) GetTopologyHash() uint64

func (*RebalanceCompleted) ProtoMessage

func (*RebalanceCompleted) ProtoMessage()

func (*RebalanceCompleted) ProtoReflect

func (x *RebalanceCompleted) ProtoReflect() protoreflect.Message

func (*RebalanceCompleted) Reset

func (x *RebalanceCompleted) Reset()

func (*RebalanceCompleted) String

func (x *RebalanceCompleted) String() string

type RemoteIdentityHandover

type RemoteIdentityHandover struct {
	Actors       *PackedActivations `protobuf:"bytes,1,opt,name=actors,proto3" json:"actors,omitempty"`
	ChunkId      int32              `protobuf:"varint,2,opt,name=chunk_id,json=chunkId,proto3" json:"chunk_id,omitempty"`
	Final        bool               `protobuf:"varint,3,opt,name=final,proto3" json:"final,omitempty"`
	TopologyHash uint64             `protobuf:"varint,4,opt,name=topology_hash,json=topologyHash,proto3" json:"topology_hash,omitempty"`
	Skipped      int32              `protobuf:"varint,5,opt,name=skipped,proto3" json:"skipped,omitempty"`
	Sent         int32              `protobuf:"varint,6,opt,name=sent,proto3" json:"sent,omitempty"`
	// contains filtered or unexported fields
}

func (*RemoteIdentityHandover) Descriptor deprecated

func (*RemoteIdentityHandover) Descriptor() ([]byte, []int)

Deprecated: Use RemoteIdentityHandover.ProtoReflect.Descriptor instead.

func (*RemoteIdentityHandover) GetActors

func (x *RemoteIdentityHandover) GetActors() *PackedActivations

func (*RemoteIdentityHandover) GetChunkId

func (x *RemoteIdentityHandover) GetChunkId() int32

func (*RemoteIdentityHandover) GetFinal

func (x *RemoteIdentityHandover) GetFinal() bool

func (*RemoteIdentityHandover) GetSent

func (x *RemoteIdentityHandover) GetSent() int32

func (*RemoteIdentityHandover) GetSkipped

func (x *RemoteIdentityHandover) GetSkipped() int32

func (*RemoteIdentityHandover) GetTopologyHash

func (x *RemoteIdentityHandover) GetTopologyHash() uint64

func (*RemoteIdentityHandover) ProtoMessage

func (*RemoteIdentityHandover) ProtoMessage()

func (*RemoteIdentityHandover) ProtoReflect

func (x *RemoteIdentityHandover) ProtoReflect() protoreflect.Message

func (*RemoteIdentityHandover) Reset

func (x *RemoteIdentityHandover) Reset()

func (*RemoteIdentityHandover) String

func (x *RemoteIdentityHandover) String() string

type RemoveConsensusCheck

type RemoveConsensusCheck struct {
	ID string
}

Mimic .NET ReenterAfterCancellation on GossipActor

func NewRemoveConsensusCheck

func NewRemoveConsensusCheck(id string) RemoveConsensusCheck

type Rendezvous

type Rendezvous struct {
	// contains filtered or unexported fields
}

func NewRendezvous

func NewRendezvous() *Rendezvous

func (*Rendezvous) GetByClusterIdentity

func (r *Rendezvous) GetByClusterIdentity(ci *ClusterIdentity) string

func (*Rendezvous) GetByIdentity

func (r *Rendezvous) GetByIdentity(identity string) string

func (*Rendezvous) UpdateMembers

func (r *Rendezvous) UpdateMembers(members Members)

type SendGossipStateRequest

type SendGossipStateRequest struct{}

type SendGossipStateResponse

type SendGossipStateResponse struct{}

type SetGossipStateKey

type SetGossipStateKey struct {
	Key   string
	Value proto.Message
}

Used to setup Gossip Status Keys in the GossipActor

func NewGossipStateKey

func NewGossipStateKey(key string, value proto.Message) SetGossipStateKey

Create a new SetGossipStateKey value with the given data and return it back

type SetGossipStateResponse

type SetGossipStateResponse struct{}

Used by the GossipActor to respond SetGossipStatus requests

type SimpleRoundRobin

type SimpleRoundRobin struct {
	// contains filtered or unexported fields
}

func NewSimpleRoundRobin

func NewSimpleRoundRobin(memberStrategy MemberStrategy) *SimpleRoundRobin

func (*SimpleRoundRobin) GetByRoundRobin

func (r *SimpleRoundRobin) GetByRoundRobin() string

type SpawnLock

type SpawnLock struct {
	LockID          string
	ClusterIdentity *ClusterIdentity
}

SpawnLock contains

type StorageLookup

type StorageLookup interface {
	TryGetExistingActivation(clusterIdentity *ClusterIdentity) *StoredActivation

	TryAcquireLock(clusterIdentity *ClusterIdentity) *SpawnLock

	WaitForActivation(clusterIdentity *ClusterIdentity) *StoredActivation

	RemoveLock(spawnLock SpawnLock)

	StoreActivation(memberID string, spawnLock *SpawnLock, pid *actor.PID)

	RemoveActivation(pid *SpawnLock)

	RemoveMemberId(memberID string)
}

StorageLookup contains

type StoredActivation

type StoredActivation struct {
	Pid      string
	MemberID string
}

StoredActivation contains

Directories

Path Synopsis
clusterproviders
k8s
zk
identitylookup

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL